Upgrade of BBS-ep service
Upgraded service to use latest DCAE-SDK
Upgraded many of the dependencies to latest versions
Introduced Java 11
Change-Id: I29d265d2a75aa80749f567cfb10920b2c45c2cec
Issue-ID: DCAEGEN2-2105
Signed-off-by: Stavros Kanarakis <stavros.kanarakis@nokia.com>
diff --git a/components/bbs-event-processor/Dockerfile b/components/bbs-event-processor/Dockerfile
index e799bd9..a9e6a89 100644
--- a/components/bbs-event-processor/Dockerfile
+++ b/components/bbs-event-processor/Dockerfile
@@ -1,4 +1,4 @@
-FROM openjdk:8-jre-alpine
+FROM openjdk:11-jre-slim
ARG PROJECT_BUILD_DIR_NAME
ARG FINAL_JAR
@@ -6,11 +6,14 @@
ARG DOCKER_ARTIFACT_DIR
#Add a new user and group to allow container to be run as non-root
-RUN addgroup -S bbs-ep && adduser -S -G bbs-ep bbs-ep
+RUN addgroup --system bbs-ep && adduser --system --ingroup bbs-ep bbs-ep
#Copy dependencies and executable jar
WORKDIR ${DOCKER_ARTIFACT_DIR}
COPY ${PROJECT_BUILD_DIR_NAME}/${FINAL_JAR} .
+COPY KeyStore.jks .
+COPY KeyStorePass.txt .
+
#Overcome Docker limitation to put ARG inside ENTRYPOINT
RUN ln -s ${FINAL_JAR} bbs-ep.jar
COPY ${PROJECT_BUILD_DIR_NAME}/${DEPENDENCIES_DIR} ./${DEPENDENCIES_DIR}
diff --git a/components/bbs-event-processor/KeyStore-update.jks b/components/bbs-event-processor/KeyStore-update.jks
new file mode 100644
index 0000000..366cc90
--- /dev/null
+++ b/components/bbs-event-processor/KeyStore-update.jks
Binary files differ
diff --git a/components/bbs-event-processor/KeyStore.jks b/components/bbs-event-processor/KeyStore.jks
new file mode 100644
index 0000000..366cc90
--- /dev/null
+++ b/components/bbs-event-processor/KeyStore.jks
Binary files differ
diff --git a/components/bbs-event-processor/KeyStorePass-update.txt b/components/bbs-event-processor/KeyStorePass-update.txt
new file mode 100644
index 0000000..0c759a4
--- /dev/null
+++ b/components/bbs-event-processor/KeyStorePass-update.txt
@@ -0,0 +1 @@
+test123
\ No newline at end of file
diff --git a/components/bbs-event-processor/KeyStorePass.txt b/components/bbs-event-processor/KeyStorePass.txt
new file mode 100644
index 0000000..0c759a4
--- /dev/null
+++ b/components/bbs-event-processor/KeyStorePass.txt
@@ -0,0 +1 @@
+test123
\ No newline at end of file
diff --git a/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml b/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml
deleted file mode 100644
index 54d3aae..0000000
--- a/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-#============LICENSE_START========================================================
-#=================================================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-
-tag_version: nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.components.bbs-event-processor:1.1.0
-pnf_reregistration_url: https:message-router:3905/events/unauthenticated.PNF_UPDATE
-cpe_authentication_url: https:message-router:3905/events/unauthenticated.CPE_AUTHENTICATION
-close_loop_url: https:message-router:3905/events/unauthenticated.DCAE_CL_OUTPUT
-application_rereg_policy_scope: policyScopeReReg
-application_rereg_cl_control_name: clControlNameReReg
-application_cpeAuth_policy_scope: policyScopeCpeAuth
-application_cpeAuth_cl_control_name: clControlNameCpeAuth
-application_cbs_polling_interval_sec: 120
-dmaap_consumer_id: c12
-dmaap_consumer_group: OpenDcae-c12
diff --git a/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template b/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template
deleted file mode 100644
index 2600ffe..0000000
--- a/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template
+++ /dev/null
@@ -1,188 +0,0 @@
-# -*- indent-tabs-mode: nil -*- # vi: set expandtab:
-#
-# ============LICENSE_START====================================================
-# =============================================================================
-# Copyright (c) 2019 AT&T, NOKIA
-# =============================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END======================================================
-
-tosca_definitions_version: cloudify_dsl_1_3
-
-imports:
- - "http://www.getcloudify.org/spec/cloudify/3.4/types.yaml"
- - https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/k8splugin/1.4.13/k8splugin_types.yaml
-
-inputs:
- aai_enrichment_host:
- type: string
- default: "aai.onap"
- aai_enrichment_port:
- type: integer
- default: 8443
- aai_enrichment_protocol:
- type: string
- default: "https"
- aai_secure_enable_cert:
- type: boolean
- description: enable certificates-based connection with AAI
- default: true
- tag_version:
- type: string
- replicas:
- type: integer
- description: number of instances
- default: 1
- pnf_reregistration_url:
- type: string
- cpe_authentication_url:
- type: string
- close_loop_url:
- type: string
- application_policy_version:
- description: Policy version value for building CL events
- type: string
- default: "1.0.0.5"
- application_cl_target_type:
- description: Close Loop target type value for building CL events
- type: string
- default: "VM"
- application_cl_event_status:
- description: Close Loop event status value for building CL events
- type: string
- default: "ONSET"
- application_cl_version:
- description: Close Loop version value for building CL events
- type: string
- default: "1.0.2"
- application_cl_target:
- description: Close Loop target value for building CL events
- type: string
- default: "vserver.vserver-name"
- application_cl_originator:
- description: Close Loop originator value for building CL events
- type: string
- default: "DCAE-BBS-ep"
- application_rereg_policy_scope:
- description: Policy Scope value for building PNF relocation CL event
- type: string
- application_rereg_cl_control_name:
- description: Close Loop control name value for building PNF relocation CL event
- type: string
- application_cpeAuth_policy_scope:
- description: Policy Scope value for building CPE Authentication CL event
- type: string
- application_cpeAuth_cl_control_name:
- description: Close Loop control name value for building CPE Authentication CL event
- type: string
- application_cbs_polling_interval_sec:
- type: integer
- default: 300
- application_logging_level:
- type: string
- default: "INFO"
- dmaap_username:
- type: string
- default: admin
- dmaap_password:
- type: string
- default: admin
- dmaap_consumer_id:
- type: string
- dmaap_consumer_group:
- type: string
- dmaap_secure_enable_cert:
- type: boolean
- description: enable certificates-based connection with DMaaP
- default: true
-node_templates:
- bbs-event-processor:
- type: dcae.nodes.ContainerizedServiceComponent
- properties:
- application_config:
- streams_subscribes:
- pnf_reregistration:
- type: message_router
- aaf_username: { get_input: dmaap_username }
- aaf_password: { get_input: dmaap_password }
- dmaap_info:
- topic_url: { get_input: pnf_reregistration_url }
- cpe_authentication:
- type: message_router
- aaf_username: { get_input: dmaap_username }
- aaf_password: { get_input: dmaap_password }
- dmaap_info:
- topic_url: { get_input: cpe_authentication_url }
- streams_publishes:
- close_loop:
- type: message_router
- aaf_username: { get_input: dmaap_username }
- aaf_password: { get_input: dmaap_password }
- dmaap_info:
- topic_url: { get_input: close_loop_url }
- dmaap.protocol: "https"
- dmaap.contentType: "application/json"
- dmaap.consumer.consumerId: { get_input: dmaap_consumer_id }
- dmaap.consumer.consumerGroup: { get_input: dmaap_consumer_group }
- dmaap.messageLimit: -1
- dmaap.timeoutMs: -1
- aai.host: { get_input: aai_enrichment_host }
- aai.port: { get_input: aai_enrichment_port }
- aai.protocol: { get_input: aai_enrichment_protocol }
- aai.username: "AAI"
- aai.password: "AAI"
- aai.aaiIgnoreSslCertificateErrors: true
- application.pipelinesPollingIntervalSec: 30
- application.pipelinesTimeoutSec: 15
- application.cbsPollingIntervalSec: { get_input: application_cbs_polling_interval_sec }
- application.policyVersion: { get_input: application_policy_version }
- application.clTargetType: { get_input: application_cl_target_type }
- application.clEventStatus: { get_input: application_cl_event_status }
- application.clVersion: { get_input: application_cl_version }
- application.clTarget: { get_input: application_cl_target }
- application.clOriginator: { get_input: application_cl_originator }
- application.reregistration.policyScope: { get_input: application_rereg_policy_scope }
- application.reregistration.clControlName: { get_input: application_rereg_cl_control_name }
- application.cpe.authentication.policyScope: { get_input: application_cpeAuth_policy_scope }
- application.cpe.authentication.clControlName: { get_input: application_cpeAuth_cl_control_name }
- application.reregistration.configKey: "pnf_reregistration"
- application.cpeAuth.configKey: "cpe_authentication"
- application.closeLoop.configKey: "close_loop"
- application.loggingLevel: { get_input: application_logging_level }
- application.ssl.keyStorePath: "/opt/app/bbs-event-processor/etc/cert/cert.jks"
- application.ssl.keyStorePasswordPath: "/opt/app/bbs-event-processor/etc/cert/jks.pass"
- application.ssl.trustStorePath: "/opt/app/bbs-event-processor/etc/cert/trust.jks"
- application.ssl.trustStorePasswordPath: "/opt/app/bbs-event-processor/etc/cert/trust.pass"
- application.ssl.enableAaiCertAuth: { get_input: aai_secure_enable_cert }
- application.ssl.enableDmaapCertAuth: { get_input: dmaap_secure_enable_cert }
- docker_config:
- healthcheck:
- endpoint: /heartbeat
- interval: 180s
- timeout: 5s
- type: http
- image:
- { get_input: tag_version }
- replicas: {get_input: replicas}
- service_component_type: 'bbs-event-processor'
- log_info:
- log_directory: "/opt/app/bbs-event-processor/logs"
- tls_info:
- cert_directory: '/opt/app/bbs-event-processor/etc/cert'
- use_tls: true
- interfaces:
- cloudify.interfaces.lifecycle:
- start:
- inputs:
- ports:
- - concat: ["8100:", "30810"]
diff --git a/components/bbs-event-processor/pom.xml b/components/bbs-event-processor/pom.xml
index 85c8cf4..a2cc155 100644
--- a/components/bbs-event-processor/pom.xml
+++ b/components/bbs-event-processor/pom.xml
@@ -12,7 +12,7 @@
<groupId>org.onap.dcaegen2.services.components</groupId>
<artifactId>bbs-event-processor</artifactId>
- <version>1.1.0-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<name>dcaegen2-services-bbs-event-processor</name>
<description>BBS Re-Registration and CPE Authentication Handler</description>
@@ -26,25 +26,21 @@
</licenses>
<properties>
- <java.version>8</java.version>
- <immutables.version>2.7.5</immutables.version>
- <spring-boot.version>2.1.3.RELEASE</spring-boot.version>
- <tomcat.version>8.5.32</tomcat.version>
- <slf4j.version>1.7.25</slf4j.version>
+ <java.version>11</java.version>
+ <immutables.version>2.8.3</immutables.version>
+ <spring-boot.version>2.1.12.RELEASE</spring-boot.version>
<junit-platform.version>1.1.0</junit-platform.version>
<jacoco.version>0.8.2</jacoco.version>
- <dcae.sdk.version>1.1.4</dcae.sdk.version>
- <wiremock.version>2.21.0</wiremock.version>
- <springfox-swagger.version>2.8.0</springfox-swagger.version>
+ <dcae.sdk.version>1.3.4</dcae.sdk.version>
+ <wiremock.version>2.24.0</wiremock.version>
+ <jaxb.api.version>2.3.0</jaxb.api.version>
+ <springfox-swagger.version>2.9.2</springfox-swagger.version>
<maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
<bbs-event-processor.main.class>org.onap.bbs.event.processor.Application</bbs-event-processor.main.class>
<dependency.dir.name>libs</dependency.dir.name>
<dependency.dir.location>${project.build.directory}/${dependency.dir.name}</dependency.dir.location>
<docker.image.name>onap/${project.groupId}.${project.artifactId}</docker.image.name>
<maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
- <sonar.coverage.jacoco.xmlReportPaths>
- ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml
- </sonar.coverage.jacoco.xmlReportPaths>
</properties>
<dependencyManagement>
@@ -61,7 +57,7 @@
</dependency>
<dependency>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
- <artifactId>common-dependency</artifactId>
+ <artifactId>http-client</artifactId>
<version>${dcae.sdk.version}</version>
</dependency>
<dependency>
@@ -96,6 +92,11 @@
<artifactId>wiremock-jre8</artifactId>
<version>${wiremock.version}</version>
</dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>${jaxb.api.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -110,7 +111,7 @@
</dependency>
<dependency>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
- <artifactId>common-dependency</artifactId>
+ <artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -170,6 +171,10 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<scope>test</scope>
@@ -337,14 +342,14 @@
<executions>
<execution>
<id>build-bbs-event-processor-image</id>
- <phase>deploy</phase>
+ <phase>package</phase>
<goals>
<goal>build</goal>
</goals>
</execution>
<execution>
<id>tag-and-push-image-latest</id>
- <phase>deploy</phase>
+ <phase>package</phase>
<goals>
<goal>tag</goal>
<goal>push</goal>
@@ -357,7 +362,7 @@
</execution>
<execution>
<id>tag-and-push-image-with-version</id>
- <phase>deploy</phase>
+ <phase>package</phase>
<goals>
<goal>tag</goal>
<goal>push</goal>
@@ -370,7 +375,7 @@
</execution>
<execution>
<id>tag-and-push-image-with-version-and-date</id>
- <phase>deploy</phase>
+ <phase>package</phase>
<goals>
<goal>tag</goal>
<goal>push</goal>
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java
index 66e1f86..94b955f 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java
@@ -71,7 +71,7 @@
@Bean
TaskScheduler threadPoolTaskScheduler() {
- ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+ var scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(THREADS_IN_POOL);
scheduler.setThreadNamePrefix("pipeline-thrd-");
return scheduler;
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
index 5022a69..33a935e 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
@@ -21,7 +21,10 @@
package org.onap.bbs.event.processor.config;
import static org.onap.bbs.event.processor.config.ApplicationConstants.STREAMS_TYPE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.keyStoreFromResource;
+import static org.onap.dcaegen2.services.sdk.security.ssl.Passwords.fromPath;
+import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -31,10 +34,12 @@
import org.onap.bbs.event.processor.exceptions.ConfigurationParsingException;
import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
import org.onap.bbs.event.processor.utilities.LoggingUtil;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@@ -48,9 +53,9 @@
private final SecurityProperties securityProperties;
private final GenericProperties genericProperties;
- private DmaapConsumerConfiguration dmaapReRegistrationConsumerConfiguration;
- private DmaapConsumerConfiguration dmaapCpeAuthenticationConsumerConfiguration;
- private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private MessageRouterSubscriberConfig dmaapReRegistrationConsumerConfiguration;
+ private MessageRouterSubscriberConfig dmaapCpeAuthenticationConsumerConfiguration;
+ private MessageRouterPublisherConfig dmaapPublisherConfiguration;
private AaiClientConfiguration aaiClientConfiguration;
private Set<ConfigurationChangeObserver> observers;
@@ -97,15 +102,15 @@
observers.forEach(ConfigurationChangeObserver::updateConfiguration);
}
- public synchronized DmaapConsumerConfiguration getDmaapReRegistrationConsumerConfiguration() {
+ public synchronized MessageRouterSubscriberConfig getDmaapReRegistrationConsumerConfiguration() {
return dmaapReRegistrationConsumerConfiguration;
}
- public synchronized DmaapConsumerConfiguration getDmaapCpeAuthenticationConsumerConfiguration() {
+ public synchronized MessageRouterSubscriberConfig getDmaapCpeAuthenticationConsumerConfiguration() {
return dmaapCpeAuthenticationConsumerConfiguration;
}
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized MessageRouterPublisherConfig getDmaapPublisherConfiguration() {
return dmaapPublisherConfiguration;
}
@@ -117,6 +122,18 @@
return genericProperties.getPipelinesPollingIntervalSec();
}
+ public synchronized DmaapProducerProperties getDmaapProducerProperties() {
+ return dmaapProducerProperties;
+ }
+
+ public synchronized DmaapReRegistrationConsumerProperties getDmaapReRegistrationConsumerProperties() {
+ return dmaapReRegistrationConsumerProperties;
+ }
+
+ public synchronized DmaapCpeAuthenticationConsumerProperties getDmaapCpeAuthenticationConsumerProperties() {
+ return dmaapCpeAuthenticationConsumerProperties;
+ }
+
public synchronized int getPipelinesTimeoutInSeconds() {
return genericProperties.getPipelinesTimeoutSec();
}
@@ -180,11 +197,17 @@
securityProperties.setKeyStorePasswordPath(newConfiguration.keyStorePasswordPath());
securityProperties.setTrustStorePath(newConfiguration.trustStorePath());
securityProperties.setTrustStorePasswordPath(newConfiguration.trustStorePasswordPath());
+ final SecurityKeys securityKeys = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
- GeneratedAppConfigObject.StreamsObject reRegObject =
+ var reRegObject =
getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(),
"PNF Re-Registration");
- TopicUrlInfo topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl());
+ var topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl());
dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
@@ -196,9 +219,9 @@
dmaapReRegistrationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup());
dmaapReRegistrationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit());
dmaapReRegistrationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs());
- constructDmaapReRegistrationConfiguration();
+ constructDmaapReRegistrationConfiguration(securityKeys);
- GeneratedAppConfigObject.StreamsObject cpeAuthObject =
+ var cpeAuthObject =
getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.cpeAuthConfigKey(),
"CPE Authentication");
topicUrlInfo = parseTopicUrl(cpeAuthObject.dmaapInfo().topicUrl());
@@ -213,9 +236,9 @@
dmaapCpeAuthenticationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup());
dmaapCpeAuthenticationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit());
dmaapCpeAuthenticationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs());
- constructDmaapCpeAuthenticationConfiguration();
+ constructDmaapCpeAuthenticationConfiguration(securityKeys);
- GeneratedAppConfigObject.StreamsObject closeLoopObject =
+ var closeLoopObject =
getStreamsObject(newConfiguration.streamPublishesMap(), newConfiguration.closeLoopConfigKey(),
"Close Loop");
topicUrlInfo = parseTopicUrl(closeLoopObject.dmaapInfo().topicUrl());
@@ -226,7 +249,7 @@
dmaapProducerProperties.setDmaapUserPassword(closeLoopObject.aafPassword());
dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
- constructDmaapProducerConfiguration();
+ constructDmaapProducerConfiguration(securityKeys);
aaiClientProperties.setAaiHost(newConfiguration.aaiHost());
aaiClientProperties.setAaiPort(newConfiguration.aaiPort());
@@ -258,7 +281,7 @@
@NotNull
private GeneratedAppConfigObject.StreamsObject getStreamsObject(
Map<String, GeneratedAppConfigObject.StreamsObject> map, String configKey, String messageName) {
- GeneratedAppConfigObject.StreamsObject streamsObject = map.get(configKey);
+ var streamsObject = map.get(configKey);
if (!STREAMS_TYPE.equals(streamsObject.type())) {
throw new ApplicationEnvironmentException(String.format("%s requires information about"
+ " message-router topic in ONAP", messageName));
@@ -267,80 +290,45 @@
}
private void constructConfigurationObjects() {
- constructDmaapReRegistrationConfiguration();
- constructDmaapCpeAuthenticationConfiguration();
- constructDmaapProducerConfiguration();
+ final SecurityKeys securityKeysForReRegistration = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
+ constructDmaapReRegistrationConfiguration(securityKeysForReRegistration);
+ final SecurityKeys securityKeysForCpeAuthentication = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
+ constructDmaapCpeAuthenticationConfiguration(securityKeysForCpeAuthentication);
+ final SecurityKeys securityKeysForProducer = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
+ constructDmaapProducerConfiguration(securityKeysForProducer);
constructAaiConfiguration();
}
- private void constructDmaapReRegistrationConfiguration() {
- dmaapReRegistrationConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
- .dmaapHostName(dmaapReRegistrationConsumerProperties.getDmaapHostName())
- .dmaapPortNumber(dmaapReRegistrationConsumerProperties.getDmaapPortNumber())
- .dmaapProtocol(dmaapReRegistrationConsumerProperties.getDmaapProtocol())
- .dmaapTopicName(dmaapReRegistrationConsumerProperties.getDmaapTopicName())
- .dmaapUserName(
- dmaapReRegistrationConsumerProperties.getDmaapUserName() == null ? "" :
- dmaapReRegistrationConsumerProperties.getDmaapUserName())
- .dmaapUserPassword(
- dmaapReRegistrationConsumerProperties.getDmaapUserPassword() == null ? "" :
- dmaapReRegistrationConsumerProperties.getDmaapUserPassword())
- .dmaapContentType(dmaapReRegistrationConsumerProperties.getDmaapContentType())
- .consumerId(dmaapReRegistrationConsumerProperties.getConsumerId())
- .consumerGroup(dmaapReRegistrationConsumerProperties.getConsumerGroup())
- .timeoutMs(dmaapReRegistrationConsumerProperties.getTimeoutMs())
- .messageLimit(dmaapReRegistrationConsumerProperties.getMessageLimit())
- .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth())
- .keyStorePath(securityProperties.getKeyStorePath())
- .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath())
- .trustStorePath(securityProperties.getTrustStorePath())
- .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath())
+ private void constructDmaapReRegistrationConfiguration(final SecurityKeys securityKeys) {
+ dmaapReRegistrationConsumerConfiguration = ImmutableMessageRouterSubscriberConfig.builder()
+ .securityKeys(securityKeys)
.build();
}
- private void constructDmaapCpeAuthenticationConfiguration() {
- dmaapCpeAuthenticationConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
- .dmaapHostName(dmaapCpeAuthenticationConsumerProperties.getDmaapHostName())
- .dmaapPortNumber(dmaapCpeAuthenticationConsumerProperties.getDmaapPortNumber())
- .dmaapProtocol(dmaapCpeAuthenticationConsumerProperties.getDmaapProtocol())
- .dmaapTopicName(dmaapCpeAuthenticationConsumerProperties.getDmaapTopicName())
- .dmaapUserName(
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserName() == null ? "" :
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserName())
- .dmaapUserPassword(
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserPassword() == null ? "" :
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserPassword())
- .dmaapContentType(dmaapCpeAuthenticationConsumerProperties.getDmaapContentType())
- .consumerId(dmaapCpeAuthenticationConsumerProperties.getConsumerId())
- .consumerGroup(dmaapCpeAuthenticationConsumerProperties.getConsumerGroup())
- .timeoutMs(dmaapCpeAuthenticationConsumerProperties.getTimeoutMs())
- .messageLimit(dmaapCpeAuthenticationConsumerProperties.getMessageLimit())
- .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth())
- .keyStorePath(securityProperties.getKeyStorePath())
- .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath())
- .trustStorePath(securityProperties.getTrustStorePath())
- .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath())
+ private void constructDmaapCpeAuthenticationConfiguration(final SecurityKeys securityKeys) {
+ dmaapCpeAuthenticationConsumerConfiguration = ImmutableMessageRouterSubscriberConfig.builder()
+ .securityKeys(securityKeys)
.build();
}
- private void constructDmaapProducerConfiguration() {
- dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapHostName(dmaapProducerProperties.getDmaapHostName())
- .dmaapPortNumber(dmaapProducerProperties.getDmaapPortNumber())
- .dmaapProtocol(dmaapProducerProperties.getDmaapProtocol())
- .dmaapTopicName(dmaapProducerProperties.getDmaapTopicName())
- .dmaapUserName(
- dmaapProducerProperties.getDmaapUserName() == null ? "" :
- dmaapProducerProperties.getDmaapUserName())
- .dmaapUserPassword(
- dmaapProducerProperties.getDmaapUserPassword() == null ? "" :
- dmaapProducerProperties.getDmaapUserPassword())
- .dmaapContentType(dmaapProducerProperties.getDmaapContentType())
- .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth())
- .keyStorePath(securityProperties.getKeyStorePath())
- .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath())
- .trustStorePath(securityProperties.getTrustStorePath())
- .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath())
+ private void constructDmaapProducerConfiguration(final SecurityKeys securityKeys) {
+ dmaapPublisherConfiguration = ImmutableMessageRouterPublisherConfig.builder()
+ .securityKeys(securityKeys)
.build();
}
@@ -362,19 +350,19 @@
}
private TopicUrlInfo parseTopicUrl(String topicUrl) {
- String[] urlTokens = topicUrl.split(":");
+ var urlTokens = topicUrl.split(":");
if (urlTokens.length != 3) {
throw new ConfigurationParsingException("Wrong topic URL format");
}
- TopicUrlInfo topicUrlInfo = new TopicUrlInfo();
+ var topicUrlInfo = new TopicUrlInfo();
topicUrlInfo.setHost(urlTokens[1].replace("/", ""));
- String[] tokensAfterHost = urlTokens[2].split("/events/");
+ var tokensAfterHost = urlTokens[2].split("/events/");
if (tokensAfterHost.length != 2) {
throw new ConfigurationParsingException("Wrong topic name structure");
}
topicUrlInfo.setPort(Integer.valueOf(tokensAfterHost[0]));
- topicUrlInfo.setTopicName("events/" + tokensAfterHost[1]);
+ topicUrlInfo.setTopicName(tokensAfterHost[1]);
return topicUrlInfo;
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
index d2ac3c1..7be6bde 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
@@ -35,5 +35,9 @@
// Close Loop Constants
public static final String DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE = "DCAE.BBS_event_processor_mSInstance";
+ // Subscribe for DMaaP
+ public static final String SUBSCRIBE_URL_TEMPLATE = "%s://%s:%d/events/%s";
+ public static final String PUBLISH_URL_TEMPLATE = "%s://%s:%d/events/%s";
+
private ApplicationConstants() {}
}
\ No newline at end of file
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
index 607b3b3..4ac8549 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
@@ -40,8 +40,7 @@
import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,9 +84,9 @@
}
boolean environmentNotReady() {
- String consulHost = System.getenv().get(CONSUL_HOST);
- String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
- String hostname = System.getenv().get(HOSTNAME);
+ var consulHost = System.getenv().get(CONSUL_HOST);
+ var cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
+ var hostname = System.getenv().get(HOSTNAME);
return consulHost == null || cbs == null || hostname == null;
}
@@ -106,7 +105,7 @@
private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
- GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
+ var generatedAppConfigObject = generateAppConfigObject(jsonObject);
LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
configuration.updateCurrentConfiguration(generatedAppConfigObject);
}
@@ -124,10 +123,10 @@
RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
// Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
- EnvProperties env = EnvProperties.fromEnvironment();
- CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
+ var cbsClientConfig = CbsClientConfiguration.fromEnvironment();
+ var cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
// Create the client and use it to get the configuration
- cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
+ cbsFetchPipeline = CbsClientFactory.createCbsClient(cbsClientConfig)
.doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
.retry(e -> true)
.flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
@@ -138,57 +137,57 @@
GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
if (LOGGER.isInfoEnabled()) {
- String configAsString = gson.toJson(configObject);
+ var configAsString = gson.toJson(configObject);
LOGGER.info("Received App Config object\n{}", configAsString);
}
- final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
- final String dmaapContentType = configObject.get("dmaap.contentType").getAsString();
- final String dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString();
- final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
- final int dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt();
- final int dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt();
+ final var dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
+ final var dmaapContentType = configObject.get("dmaap.contentType").getAsString();
+ final var dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString();
+ final var dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
+ final var dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt();
+ final var dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt();
- final String aaiHost = configObject.get("aai.host").getAsString();
- final int aaiPort = configObject.get("aai.port").getAsInt();
- final String aaiProtocol = configObject.get("aai.protocol").getAsString();
- final String aaiUsername = configObject.get("aai.username").getAsString();
- final String aaiPassword = configObject.get("aai.password").getAsString();
- final boolean aaiIgnoreSslCertificateErrors =
+ final var aaiHost = configObject.get("aai.host").getAsString();
+ final var aaiPort = configObject.get("aai.port").getAsInt();
+ final var aaiProtocol = configObject.get("aai.protocol").getAsString();
+ final var aaiUsername = configObject.get("aai.username").getAsString();
+ final var aaiPassword = configObject.get("aai.password").getAsString();
+ final var aaiIgnoreSslCertificateErrors =
configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
- final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
- final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
- final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
+ final var pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
+ final var pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
+ final var cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
- final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
- final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
- final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
- final String cpeAuthClControlName =
+ final var reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
+ final var reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
+ final var cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
+ final var cpeAuthClControlName =
configObject.get("application.cpe.authentication.clControlName").getAsString();
- final String policyVersion = configObject.get("application.policyVersion").getAsString();
- final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
- final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
- final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
- final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
- final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
+ final var policyVersion = configObject.get("application.policyVersion").getAsString();
+ final var closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
+ final var closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
+ final var closeLoopVersion = configObject.get("application.clVersion").getAsString();
+ final var closeLoopTarget = configObject.get("application.clTarget").getAsString();
+ final var closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
- final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
- final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
- final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
+ final var reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
+ final var cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
+ final var closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
- final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
+ final var loggingLevel = configObject.get("application.loggingLevel").getAsString();
- final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
- final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
- final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
- final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
- final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
- final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
+ final var keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
+ final var keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
+ final var trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
+ final var trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
+ final var aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
+ final var dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
- final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
- final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
+ final var streamsPublishes = configObject.getAsJsonObject("streams_publishes");
+ final var streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
return ImmutableGeneratedAppConfigObject.builder()
.dmaapProtocol(dmaapProtocol)
@@ -259,22 +258,22 @@
private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
Map.Entry<String, JsonElement> jsonEntry) {
- JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
+ var closeLoopOutput = (JsonObject) jsonEntry.getValue();
- String type = closeLoopOutput.get("type").getAsString();
- String aafUsername = closeLoopOutput.get("aaf_username") != null
+ var type = closeLoopOutput.get("type").getAsString();
+ var aafUsername = closeLoopOutput.get("aaf_username") != null
? closeLoopOutput.get("aaf_username").getAsString() : "";
- String aafPassword = closeLoopOutput.get("aaf_password") != null
+ var aafPassword = closeLoopOutput.get("aaf_password") != null
? closeLoopOutput.get("aaf_password").getAsString() : "";
- JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
- String clientId = dmaapInfo.get("client_id") != null
+ var dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
+ var clientId = dmaapInfo.get("client_id") != null
? dmaapInfo.get("client_id").getAsString() : "";
- String clientRole = dmaapInfo.get("client_role") != null
+ var clientRole = dmaapInfo.get("client_role") != null
? dmaapInfo.get("client_role").getAsString() : "";
- String location = dmaapInfo.get("location") != null
+ var location = dmaapInfo.get("location") != null
? dmaapInfo.get("location").getAsString() : "";
- String topicUrl = dmaapInfo.get("topic_url").getAsString();
+ var topicUrl = dmaapInfo.get("topic_url").getAsString();
GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
.clientId(clientId)
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java
new file mode 100644
index 0000000..b0ecfd5
--- /dev/null
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+*/
+
+package org.onap.bbs.event.processor.config;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MessageRouterConfig {
+
+ @Bean(name = "ReRegMessageRouterSubscriber")
+ public MessageRouterSubscriber reRegistrationMessageRouterSubscriber(ApplicationConfiguration configuration) {
+ return DmaapClientFactory
+ .createMessageRouterSubscriber(configuration.getDmaapReRegistrationConsumerConfiguration());
+ }
+
+ @Bean(name = "CpeAuthMessageRouterSubscriber")
+ public MessageRouterSubscriber registrationMessageRouterSubscriber(ApplicationConfiguration configuration) {
+ return DmaapClientFactory
+ .createMessageRouterSubscriber(configuration.getDmaapCpeAuthenticationConsumerConfiguration());
+ }
+
+ @Bean
+ public MessageRouterPublisher mrPub(ApplicationConfiguration configuration) {
+ return DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration());
+ }
+}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java
index 1be4f43..e2e4979 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java
@@ -22,10 +22,6 @@
public class EmptyDmaapResponseException extends RuntimeException {
- public EmptyDmaapResponseException() {
- super();
- }
-
public EmptyDmaapResponseException(String message) {
super(message);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java
index 8fa73e4..c952e9b 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java
@@ -26,11 +26,10 @@
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true)
-public interface ControlLoopPublisherDmaapModel extends DmaapModel {
+public interface ControlLoopPublisherDmaapModel {
@SerializedName(value = "closedLoopEventClient", alternate = "closedLoopEventClient")
String getClosedLoopEventClient();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java
index 42c9896..50ed66e 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java
@@ -26,12 +26,10 @@
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true)
-public interface CpeAuthenticationConsumerDmaapModel extends AaiModel, DmaapModel {
+public interface CpeAuthenticationConsumerDmaapModel {
@SerializedName(value = "correlationId", alternate = "correlationId")
String getCorrelationId();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java
index 682c064..f64f30e 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java
@@ -26,11 +26,10 @@
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.ClientModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true, emptyAsNulls = true)
-public interface PnfAaiObject extends ClientModel {
+public interface PnfAaiObject {
@SerializedName(value = "pnf-name", alternate = "pnf-name")
String getPnfName();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java
index 07fb75a..3f58aa3 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java
@@ -24,12 +24,10 @@
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true)
-public interface ReRegistrationConsumerDmaapModel extends AaiModel, DmaapModel {
+public interface ReRegistrationConsumerDmaapModel {
@SerializedName(value = "correlationId", alternate = "correlationId")
String getCorrelationId();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
index a30903b..2cab017 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
@@ -25,13 +25,11 @@
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
@@ -42,14 +40,13 @@
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
-import org.onap.bbs.event.processor.model.MetadataListAaiObject;
import org.onap.bbs.event.processor.model.PnfAaiObject;
import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -99,7 +96,7 @@
LOGGER.trace("Reactive CPE Authentication pipeline subscribed - Execution started");
}
- Flux<HttpResponse> executePipeline() {
+ Flux<MessageRouterPublishResponse> executePipeline() {
return
// Consume CPE Authentication from DMaaP
consumeCpeAuthenticationFromDmaap()
@@ -111,12 +108,12 @@
.flatMap(this::triggerPolicy);
}
- private void onSuccess(HttpResponse responseCode) {
- MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
- LOGGER.info("CPE Authentication event successfully handled. "
- + "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.statusCode(), responseCode.statusReason());
- MDC.remove(RESPONSE_CODE);
+ private void onSuccess(MessageRouterPublishResponse response) {
+ if (response.successful()) {
+ LOGGER.info("CPE Authentication event successfully handled. Published Policy event to DMaaP");
+ } else {
+ LOGGER.error("CPE Authentication event handling error [{}]", response.failReason());
+ }
}
private void onError(Throwable throwable) {
@@ -149,7 +146,7 @@
.map(event -> {
// For each message, we have to keep separate state. This state will be enhanced
// in each step and handed off to the next processing step
- PipelineState state = new PipelineState();
+ var state = new PipelineState();
state.setCpeAuthenticationEvent(event);
return state;
});
@@ -161,9 +158,9 @@
private Mono<PipelineState> fetchPnfFromAai(PipelineState state) {
- CpeAuthenticationConsumerDmaapModel vesEvent = state.getCpeAuthenticationEvent();
- String pnfName = vesEvent.getCorrelationId();
- String url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
+ var vesEvent = state.getCpeAuthenticationEvent();
+ var pnfName = vesEvent.getCorrelationId();
+ var url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
LOGGER.debug("Processing Step: Retrieve PNF. Url: ({})", url);
return aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, url)
@@ -191,10 +188,10 @@
return Mono.empty();
}
- PnfAaiObject pnf = state.getPnfAaiObject();
+ var pnf = state.getPnfAaiObject();
// Assuming that the PNF will only have a single service-instance relationship pointing
// towards the HSI CFS service
- String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
+ var serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
.filter(e -> "service-instance".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
@@ -208,7 +205,7 @@
return Mono.empty();
}
- String url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
serviceInstanceId);
LOGGER.debug("Processing Step: Retrieve HSI CFS Service. Url: ({})", url);
return aaiClientTask.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, url)
@@ -230,21 +227,13 @@
});
}
- private Mono<HttpResponse> triggerPolicy(PipelineState state) {
+ private Flux<MessageRouterPublishResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
- return Mono.empty();
+ return Flux.empty();
}
- // At this point, we must check if the PNF RGW MAC address matches the value extracted from VES event
- if (!isCorrectMacAddress(state)) {
- LOGGER.warn("Processing stopped. RGW MAC address taken from event ({}) "
- + "does not match with A&AI metadata corresponding value",
- state.getCpeAuthenticationEvent().getRgwMacAddress());
- return Mono.empty();
- }
-
- ControlLoopPublisherDmaapModel event = buildTriggeringPolicyEvent(state);
+ var event = buildTriggeringPolicyEvent(state);
return publisherTask.execute(event)
.timeout(Duration.ofSeconds(configuration.getPipelinesTimeoutInSeconds()))
.doOnError(TimeoutException.class,
@@ -256,30 +245,15 @@
e -> Mono.empty());
}
-
- private boolean isCorrectMacAddress(PipelineState state) {
- // We need to check if the RGW MAC address received in VES event matches the one found in
- // HSIA CFS service (in its metadata section)
- Optional<MetadataListAaiObject> optionalMetadata = state.getHsiCfsServiceInstance()
- .getMetadataListAaiObject();
- String eventRgwMacAddress = state.getCpeAuthenticationEvent().getRgwMacAddress().orElse("");
- return optionalMetadata
- .map(list -> list.getMetadataEntries()
- .stream()
- .anyMatch(m -> "rgw-mac-address".equals(m.getMetaname())
- && m.getMetavalue().equals(eventRgwMacAddress)))
- .orElse(false);
- }
-
private ControlLoopPublisherDmaapModel buildTriggeringPolicyEvent(PipelineState state) {
- String cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
+ var cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
Map<String, String> enrichmentData = new HashMap<>();
enrichmentData.put("service-information.hsia-cfs-service-instance-id", cfsServiceInstanceId);
enrichmentData.put("cpe.old-authentication-state", state.cpeAuthenticationEvent.getOldAuthenticationState());
enrichmentData.put("cpe.new-authentication-state", state.cpeAuthenticationEvent.getNewAuthenticationState());
- String swVersion = state.getCpeAuthenticationEvent().getSwVersion().orElse("");
+ var swVersion = state.getCpeAuthenticationEvent().getSwVersion().orElse("");
if (!StringUtils.isEmpty(swVersion)) {
enrichmentData.put("cpe.swVersion", swVersion);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
index 33a9aea..0bc5e35 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
@@ -25,12 +25,10 @@
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
@@ -48,7 +46,7 @@
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -98,7 +96,7 @@
LOGGER.trace("Reactive PNF Re-registration pipeline subscribed - Execution started");
}
- Flux<HttpResponse> executePipeline() {
+ Flux<MessageRouterPublishResponse> executePipeline() {
return
// Consume Re-Registration from DMaaP
consumeReRegistrationsFromDmaap()
@@ -110,12 +108,12 @@
.flatMap(this::triggerPolicy);
}
- private void onSuccess(HttpResponse responseCode) {
- MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
- LOGGER.info("PNF Re-Registration event successfully handled. "
- + "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.statusCode(), responseCode.statusReason());
- MDC.remove(RESPONSE_CODE);
+ private void onSuccess(MessageRouterPublishResponse response) {
+ if (response.successful()) {
+ LOGGER.info("PNF Re-Registration event successfully handled. Published Policy event to DMaaP");
+ } else {
+ LOGGER.error("PNF Re-Registration event handling error [{}]", response.failReason());
+ }
}
private void onError(Throwable throwable) {
@@ -148,7 +146,7 @@
.map(event -> {
// For each message, we have to keep separate state. This state will be enhanced
// in each step and handed off to the next processing step
- PipelineState state = new PipelineState();
+ var state = new PipelineState();
state.setReRegistrationEvent(event);
return state;
});
@@ -160,9 +158,9 @@
private Mono<PipelineState> fetchPnfFromAai(PipelineState state) {
- ReRegistrationConsumerDmaapModel vesEvent = state.getReRegistrationEvent();
- String pnfName = vesEvent.getCorrelationId();
- String url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
+ var vesEvent = state.getReRegistrationEvent();
+ var pnfName = vesEvent.getCorrelationId();
+ var url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
LOGGER.debug("Processing Step: Retrieve PNF. Url: ({})", url);
return aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, url)
@@ -196,10 +194,10 @@
return Mono.empty();
}
- PnfAaiObject pnf = state.getPnfAaiObject();
+ var pnf = state.getPnfAaiObject();
// Assuming that the PNF will only have a single service-instance relationship pointing
// towards the HSI CFS service
- String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
+ var serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
.filter(e -> "service-instance".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
@@ -213,7 +211,7 @@
return Mono.empty();
}
- String url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
serviceInstanceId);
LOGGER.debug("Processing Step: Retrieve HSI CFS Service. Url: ({})", url);
return aaiClientTask.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, url)
@@ -236,8 +234,7 @@
}
private boolean isNotReallyAnOntRelocation(PipelineState state) {
- List<RelationshipListAaiObject.RelationshipEntryAaiObject> relationshipEntries =
- state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries();
+ var relationshipEntries = state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries();
// If no logical-link, fail further processing
if (relationshipEntries.stream().noneMatch(e -> "logical-link".equals(e.getRelatedTo()))) {
@@ -247,7 +244,7 @@
}
// Assuming PNF will only have one logical-link per BBS use case design
- boolean isNotRelocation = relationshipEntries
+ var isNotRelocation = relationshipEntries
.stream()
.filter(e -> "logical-link".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
@@ -263,13 +260,13 @@
return isNotRelocation;
}
- private Mono<HttpResponse> triggerPolicy(PipelineState state) {
+ private Flux<MessageRouterPublishResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
- return Mono.empty();
+ return Flux.empty();
}
- ControlLoopPublisherDmaapModel event = buildTriggeringPolicyEvent(state);
+ var event = buildTriggeringPolicyEvent(state);
return publisherTask.execute(event)
.timeout(Duration.ofSeconds(configuration.getPipelinesTimeoutInSeconds()))
.doOnError(TimeoutException.class,
@@ -283,12 +280,12 @@
private ControlLoopPublisherDmaapModel buildTriggeringPolicyEvent(PipelineState state) {
- String cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
+ var cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
- String attachmentPoint = state.getReRegistrationEvent().getAttachmentPoint();
- String remoteId = state.getReRegistrationEvent().getRemoteId();
- String cvlan = state.getReRegistrationEvent().getCVlan();
- String svlan = state.getReRegistrationEvent().getSVlan();
+ var attachmentPoint = state.getReRegistrationEvent().getAttachmentPoint();
+ var remoteId = state.getReRegistrationEvent().getRemoteId();
+ var cvlan = state.getReRegistrationEvent().getCVlan();
+ var svlan = state.getReRegistrationEvent().getSVlan();
Map<String, String> enrichmentData = new HashMap<>();
enrichmentData.put("service-information.hsia-cfs-service-instance-id", cfsServiceInstanceId);
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
index 5fbb087..882635c 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
@@ -96,7 +96,7 @@
LOGGER.info("BBS event processing pipelines will start in {} seconds "
+ "and will run periodically every {} seconds", PIPELINES_INITIAL_DELAY_IN_SECONDS,
currentPipelinesPollingInterval);
- Instant desiredStartTime = Instant.now().plusSeconds(PIPELINES_INITIAL_DELAY_IN_SECONDS);
+ var desiredStartTime = Instant.now().plusSeconds(PIPELINES_INITIAL_DELAY_IN_SECONDS);
scheduleProcessingTasks(desiredStartTime, currentPipelinesPollingInterval);
// Register for configuration changes
@@ -118,7 +118,7 @@
cancelScheduledProcessingTasks();
reScheduleProcessingTasks();
}
- int newCbsPollingInterval = configuration.getCbsPollingInterval();
+ var newCbsPollingInterval = configuration.getCbsPollingInterval();
if (newCbsPollingInterval != currentCbsPollingInterval) {
if (newCbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL) {
LOGGER.warn("CBS Polling interval is too small ({}). Will not re-schedule CBS job",
@@ -222,9 +222,9 @@
}
private int validatePipelinesPollingInterval() {
- int pipelinesPollingInterval = configuration.getPipelinesPollingIntervalInSeconds();
- boolean isSmallInterval = pipelinesPollingInterval < DEFAULT_PIPELINES_POLLING_INTERVAL;
- int verifiedInterval = isSmallInterval ? DEFAULT_PIPELINES_POLLING_INTERVAL : pipelinesPollingInterval;
+ var pipelinesPollingInterval = configuration.getPipelinesPollingIntervalInSeconds();
+ var isSmallInterval = pipelinesPollingInterval < DEFAULT_PIPELINES_POLLING_INTERVAL;
+ var verifiedInterval = isSmallInterval ? DEFAULT_PIPELINES_POLLING_INTERVAL : pipelinesPollingInterval;
if (isSmallInterval) {
LOGGER.warn("Pipelines Polling interval is too small ({}). Defaulting to {}", pipelinesPollingInterval,
DEFAULT_PIPELINES_POLLING_INTERVAL);
@@ -233,9 +233,9 @@
}
private int verifyCbsPollingInterval() {
- int cbsPollingInterval = configuration.getCbsPollingInterval();
- boolean isSmallInterval = cbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL;
- int verifiedInterval = isSmallInterval ? DEFAULT_CBS_POLLING_INTERVAL : cbsPollingInterval;
+ var cbsPollingInterval = configuration.getCbsPollingInterval();
+ var isSmallInterval = cbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL;
+ var verifiedInterval = isSmallInterval ? DEFAULT_CBS_POLLING_INTERVAL : cbsPollingInterval;
if (isSmallInterval) {
LOGGER.warn("CBS Polling interval is too small ({}). Defaulting to {}", cbsPollingInterval,
DEFAULT_CBS_POLLING_INTERVAL);
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
index da51028..153cb91 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
@@ -20,59 +20,61 @@
package org.onap.bbs.event.processor.tasks;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Component
-public class DmaapCpeAuthenticationConsumerTaskImpl
- implements DmaapCpeAuthenticationConsumerTask, ConfigurationChangeObserver {
+public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthenticationConsumerTask,
+ ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapCpeAuthenticationConsumerTaskImpl.class);
- private ApplicationConfiguration configuration;
+
private final CpeAuthenticationDmaapConsumerJsonParser cpeAuthenticationDmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
+ private ApplicationConfiguration configuration;
+ private MessageRouterSubscriber subscriber;
+ private String subscribeUrl;
+ private MessageRouterSubscribeRequest subscribeRequest;
private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION =
- new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP");
-
- private DMaaPConsumerReactiveHttpClient httpClient;
+ new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP");
@Autowired
- public DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException {
- this(configuration, new CpeAuthenticationDmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
- }
-
DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration,
- CpeAuthenticationDmaapConsumerJsonParser
- cpeAuthenticationDmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) throws SSLException {
+ @Qualifier("CpeAuthMessageRouterSubscriber")
+ MessageRouterSubscriber subscriber,
+ CpeAuthenticationDmaapConsumerJsonParser parser) {
+ this.cpeAuthenticationDmaapConsumerJsonParser = parser;
this.configuration = configuration;
- this.cpeAuthenticationDmaapConsumerJsonParser = cpeAuthenticationDmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
+ this.subscriber = subscriber;
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName());
- httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId());
}
@PostConstruct
@@ -87,24 +89,25 @@
@Override
public synchronized void updateConfiguration() {
- try {
- LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
- LOGGER.info("Creating secure context with:\n {}",
- this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update");
- LOGGER.debug("SSL exception\n", e);
- }
+ LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId());
}
@Override
public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName);
- DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
- return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response)
- .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
+ return subscriber.getElements(subscribeRequest) // subscriber.get(subscribeRequest)
+ .flatMap(jsonElement ->
+ cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement)))
+ .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
if (!(e instanceof EmptyDmaapResponseException)) {
LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage());
@@ -112,8 +115,4 @@
}
});
}
-
- private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() {
- return httpClient;
- }
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
index 749c4e5..dec1dbc 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
@@ -21,11 +21,11 @@
package org.onap.bbs.event.processor.tasks;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
public interface DmaapPublisherTask {
- Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
+ Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
index 283e5ef..6c50b10 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
@@ -20,54 +20,48 @@
package org.onap.bbs.event.processor.tasks;
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.PUBLISH_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createPublishRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
@Component
public class DmaapPublisherTaskImpl implements DmaapPublisherTask, ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private ApplicationConfiguration configuration;
- private final PublisherReactiveHttpClientFactory httpClientFactory;
- private DMaaPPublisherReactiveHttpClient httpClient;
+ private ApplicationConfiguration configuration;
+ private MessageRouterPublisher publisher;
+ private String publishUrl;
+ private MessageRouterPublishRequest publishRequest;
@Autowired
- DmaapPublisherTaskImpl(ApplicationConfiguration configuration) {
- this(configuration, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),
- new ControlLoopJsonBodyBuilder()));
- }
-
- DmaapPublisherTaskImpl(ApplicationConfiguration configuration,
- PublisherReactiveHttpClientFactory httpClientFactory) {
+ DmaapPublisherTaskImpl(ApplicationConfiguration configuration, MessageRouterPublisher publisher) {
this.configuration = configuration;
- this.httpClientFactory = httpClientFactory;
-
- try {
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage());
- LOGGER.debug("SSL exception\n", e);
- }
+ this.publisher = publisher;
+ publishUrl = String.format(PUBLISH_URL_TEMPLATE,
+ this.configuration.getDmaapProducerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapProducerProperties().getDmaapHostName(),
+ this.configuration.getDmaapProducerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapProducerProperties().getDmaapTopicName());
+ publishRequest = createPublishRequest(publishUrl);
}
@PostConstruct
@@ -83,27 +77,23 @@
@Override
public synchronized void updateConfiguration() {
LOGGER.info("DMaaP Publisher update due to new application configuration");
- try {
- LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage());
- LOGGER.debug("SSL exception\n", e);
- }
+ publisher =
+ DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration());
+ publishUrl = String.format(PUBLISH_URL_TEMPLATE,
+ configuration.getDmaapProducerProperties().getDmaapProtocol(),
+ configuration.getDmaapProducerProperties().getDmaapHostName(),
+ configuration.getDmaapProducerProperties().getDmaapPortNumber(),
+ configuration.getDmaapProducerProperties().getDmaapTopicName());
+ publishRequest = createPublishRequest(publishUrl);
}
@Override
- public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
- if (controlLoopPublisherDmaapModel == null) {
+ public Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel event) {
+ if (event == null) {
throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message");
}
LOGGER.info("Executing task for publishing control loop message");
- LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel);
- DMaaPPublisherReactiveHttpClient httpClient = getHttpClient();
- return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
- }
-
- private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() {
- return httpClient;
+ LOGGER.debug("CL message \n{}", event);
+ return publisher.put(publishRequest, Flux.just(ControlLoopJsonBodyBuilder.createAsJsonElement(event)));
}
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
index e40037b..aff563c 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
@@ -20,25 +20,23 @@
package org.onap.bbs.event.processor.tasks;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@@ -49,30 +47,33 @@
ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapReRegistrationConsumerTaskImpl.class);
- private ApplicationConfiguration configuration;
+
private final ReRegistrationDmaapConsumerJsonParser reRegistrationDmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
+ private ApplicationConfiguration configuration;
+ private MessageRouterSubscriber subscriber;
+ private String subscribeUrl;
+ private MessageRouterSubscribeRequest subscribeRequest;
private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION =
new EmptyDmaapResponseException("PNF Re-Registration: Got an empty response from DMaaP");
- private DMaaPConsumerReactiveHttpClient httpClient;
-
@Autowired
- public DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException {
- this(configuration, new ReRegistrationDmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
- }
-
DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration,
- ReRegistrationDmaapConsumerJsonParser reRegDmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory)
- throws SSLException {
+ @Qualifier("ReRegMessageRouterSubscriber") MessageRouterSubscriber subscriber,
+ ReRegistrationDmaapConsumerJsonParser parser) {
+ this.reRegistrationDmaapConsumerJsonParser = parser;
this.configuration = configuration;
- this.reRegistrationDmaapConsumerJsonParser = reRegDmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
+ this.subscriber = subscriber;
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName());
- httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId());
}
@PostConstruct
@@ -87,24 +88,25 @@
@Override
public synchronized void updateConfiguration() {
- try {
- LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
- LOGGER.info("Creating secure context with:\n {}",
- this.configuration.getDmaapReRegistrationConsumerConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update");
- LOGGER.debug("SSL exception\n", e);
- }
+ LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId());
}
@Override
public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName);
- DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
- return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response)
- .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
+ return subscriber.getElements(subscribeRequest)
+ .flatMap(jsonElement ->
+ reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement)))
+ .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
if (!(e instanceof EmptyDmaapResponseException)) {
LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage());
@@ -112,8 +114,4 @@
}
});
}
-
- private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() {
- return httpClient;
- }
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
index 84fc9f7..c0e9b1c 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
@@ -20,6 +20,8 @@
package org.onap.bbs.event.processor.utilities;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.keyStoreFromResource;
+import static org.onap.dcaegen2.services.sdk.security.ssl.Passwords.fromPath;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import com.google.gson.Gson;
@@ -27,9 +29,10 @@
import io.netty.handler.ssl.SslContext;
+import java.nio.file.Paths;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.AaiClientConfiguration;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
@@ -37,7 +40,9 @@
import org.onap.bbs.event.processor.exceptions.AaiTaskException;
import org.onap.bbs.event.processor.model.PnfAaiObject;
import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
-import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -64,7 +69,7 @@
private AaiClientConfiguration aaiClientConfiguration;
@Autowired
- AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) throws SSLException {
+ AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) {
this.configuration = configuration;
this.gson = gson;
this.sslFactory = new SslFactory();
@@ -85,25 +90,20 @@
@Override
public void updateConfiguration() {
- AaiClientConfiguration newConfiguration = configuration.getAaiClientConfiguration();
+ var newConfiguration = configuration.getAaiClientConfiguration();
if (aaiClientConfiguration.equals(newConfiguration)) {
LOGGER.info("No Configuration changes necessary for AAI Reactive client");
} else {
synchronized (this) {
LOGGER.info("AAI Reactive client must be re-configured");
aaiClientConfiguration = newConfiguration;
- try {
- setupWebClient();
- } catch (SSLException e) {
- LOGGER.error("AAI Reactive client SSL error while re-configuring WebClient");
- LOGGER.debug("SSL Exception\n", e);
- }
+ setupWebClient();
}
}
}
- private void setupWebClient() throws SSLException {
- SslContext sslContext = createSslContext();
+ private void setupWebClient() {
+ var sslContext = createSslContext();
ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(
HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
@@ -132,7 +132,7 @@
private <T> Mono<T> performReactiveHttpGet(String url, Class<T> responseType) {
LOGGER.debug("Will issue Reactive GET request to URL ({}) for object ({})", url, responseType.getName());
- WebClient webClient = getWebClient();
+ var webClient = getWebClient();
return webClient
.get()
.uri(url)
@@ -179,17 +179,18 @@
});
}
- private SslContext createSslContext() throws SSLException {
+ private SslContext createSslContext() {
if (aaiClientConfiguration.enableAaiCertAuth()) {
LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration);
- return sslFactory.createSecureContext(
- aaiClientConfiguration.keyStorePath(),
- aaiClientConfiguration.keyStorePasswordPath(),
- aaiClientConfiguration.trustStorePath(),
- aaiClientConfiguration.trustStorePasswordPath()
- );
+ final SecurityKeys securityKeys = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(aaiClientConfiguration.keyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(aaiClientConfiguration.keyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(aaiClientConfiguration.trustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(aaiClientConfiguration.trustStorePasswordPath())))
+ .build();
+ return sslFactory.createSecureClientContext(securityKeys);
}
- return sslFactory.createInsecureContext();
+ return sslFactory.createInsecureClientContext();
}
private synchronized WebClient getWebClient() {
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java
index c2b6734..b3fa09d 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java
@@ -20,7 +20,7 @@
package org.onap.bbs.event.processor.utilities;
-public class CommonEventFields {
+class CommonEventFields {
static final String COMMON_FORMAT = "\": \"%s\"";
static final String EVENT = "event";
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java
index 03ea093..f27cca5 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java
@@ -21,24 +21,23 @@
package org.onap.bbs.event.processor.utilities;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
import com.google.gson.TypeAdapterFactory;
import java.util.ServiceLoader;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
-public class ControlLoopJsonBodyBuilder implements JsonBodyBuilder<ControlLoopPublisherDmaapModel> {
+public class ControlLoopJsonBodyBuilder {
/**
* Serialize the Control Loop DMaaP model with GSON.
* @param publisherDmaapModel object to be serialized
* @return String output of serialization
*/
- @Override
public String createJsonBody(ControlLoopPublisherDmaapModel publisherDmaapModel) {
- GsonBuilder gsonBuilder = new GsonBuilder().disableHtmlEscaping();
+ var gsonBuilder = new GsonBuilder().disableHtmlEscaping();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
return gsonBuilder.create().toJson(ImmutableControlLoopPublisherDmaapModel.builder()
.closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient())
@@ -56,4 +55,30 @@
.originator(publisherDmaapModel.getOriginator())
.build());
}
+
+ /**
+ * Serialize the Control Loop DMaaP model with GSON.
+ * @param publisherDmaapModel object to be serialized
+ * @return String output of serialization
+ */
+ public static JsonElement createAsJsonElement(ControlLoopPublisherDmaapModel publisherDmaapModel) {
+ var gsonBuilder = new GsonBuilder().disableHtmlEscaping();
+ ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+ return gsonBuilder.create().toJsonTree(ImmutableControlLoopPublisherDmaapModel.builder()
+ .closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient())
+ .policyVersion(publisherDmaapModel.getPolicyVersion())
+ .policyName(publisherDmaapModel.getPolicyName())
+ .policyScope(publisherDmaapModel.getPolicyScope())
+ .targetType(publisherDmaapModel.getTargetType())
+ .aaiEnrichmentData(publisherDmaapModel.getAaiEnrichmentData())
+ .closedLoopAlarmStart(publisherDmaapModel.getClosedLoopAlarmStart())
+ .closedLoopEventStatus(publisherDmaapModel.getClosedLoopEventStatus())
+ .closedLoopControlName(publisherDmaapModel.getClosedLoopControlName())
+ .version(publisherDmaapModel.getVersion())
+ .target(publisherDmaapModel.getTarget())
+ .requestId(publisherDmaapModel.getRequestId())
+ .originator(publisherDmaapModel.getOriginator())
+ .build());
+ }
+
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
index 3cff4e6..5a0466a 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
@@ -46,11 +46,13 @@
import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+@Component
public class CpeAuthenticationDmaapConsumerJsonParser {
private static final Logger LOGGER = LoggerFactory.getLogger(CpeAuthenticationDmaapConsumerJsonParser.class);
@@ -108,9 +110,9 @@
return Mono.empty();
}
- JsonObject commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT)
+ var commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT)
.getAsJsonObject(COMMON_EVENT_HEADER);
- JsonObject stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT)
+ var stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT)
.getAsJsonObject(STATE_CHANGE_FIELDS);
pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
@@ -120,7 +122,7 @@
stateInterface = getValueFromJson(stateChangeFields, STATE_INTERFACE);
if (stateChangeFields.has(ADDITIONAL_FIELDS)) {
- JsonObject additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS);
+ var additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS);
rgwMacAddress = getValueFromJson(additionalFields, RGW_MAC_ADDRESS);
swVersion = getValueFromJson(additionalFields, SW_VERSION);
}
@@ -128,7 +130,7 @@
if (StringUtils.isEmpty(pnfSourceName)
|| authenticationStatusMissing(oldAuthenticationStatus)
|| authenticationStatusMissing(newAuthenticationStatus)) {
- String incorrectEvent = dumpJsonData();
+ var incorrectEvent = dumpJsonData();
LOGGER.warn("Incorrect CPE Authentication JSON event: {}", incorrectEvent);
return Mono.empty();
}
@@ -164,7 +166,7 @@
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
+ var jsonParser = new JsonParser();
return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
: Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java
index 09aa730..df8eaa4 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java
@@ -27,18 +27,16 @@
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
-public class CpeAuthenticationJsonBodyBuilder implements JsonBodyBuilder<CpeAuthenticationConsumerDmaapModel> {
+public class CpeAuthenticationJsonBodyBuilder {
/**
* Serialize the CPE authentication DMaaP model with GSON.
* @param cpeAuthenticationConsumerDmaapModel object to be serialized
* @return String output of serialization
*/
- @Override
public String createJsonBody(CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel) {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
return gsonBuilder.create().toJson(ImmutableCpeAuthenticationConsumerDmaapModel.builder()
.correlationId(cpeAuthenticationConsumerDmaapModel.getCorrelationId())
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java
new file mode 100644
index 0000000..be8bea0
--- /dev/null
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.bbs.event.processor.utilities;
+
+import java.nio.file.Paths;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeysStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+
+public class GenericUtils {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GenericUtils.class);
+
+ private GenericUtils() {}
+
+ /**
+ * Creates Message Router subscription request.
+ * @param topicUrl URL of topic to use
+ * @param consumerGroup Consumer Group for subscription
+ * @param consumerId Consumer ID for subscription
+ * @return request based on provided input
+ */
+ public static MessageRouterSubscribeRequest createSubscribeRequest(String topicUrl,
+ String consumerGroup, String consumerId) {
+ var sourceDefinition = ImmutableMessageRouterSource.builder()
+ .name("Subscriber source")
+ .topicUrl(topicUrl)
+ .build();
+
+ return ImmutableMessageRouterSubscribeRequest
+ .builder()
+ .sourceDefinition(sourceDefinition)
+ .consumerGroup(consumerGroup)
+ .consumerId(consumerId)
+ .build();
+ }
+
+ /**
+ * Creates Message Router publish request.
+ * @param topicUrl URL of topic to use
+ * @return request based on provided input
+ */
+ public static MessageRouterPublishRequest createPublishRequest(String topicUrl) {
+ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ .topicUrl(topicUrl)
+ .name("Producer sink")
+ .build();
+
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .contentType(ContentType.APPLICATION_JSON)
+ .build();
+ }
+
+ /**
+ * Creates a security key store for HTTPS.
+ * @param resource identifying the resource from which we will read security information
+ * @return store that will be used for HTTPS
+ */
+ @NotNull public static SecurityKeysStore keyStoreFromResource(String resource) {
+ if (StringUtils.isEmpty(resource)) {
+ throw new ApplicationEnvironmentException("Resource for security key store is empty");
+ }
+ var path = Paths.get(resource);
+ LOGGER.info("Reading keys from {}", path.toString());
+ return SecurityKeysStore.fromPath(path);
+ }
+}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
index 9fe0c27..5e249e5 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
@@ -41,11 +41,13 @@
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+@Component
public class ReRegistrationDmaapConsumerJsonParser {
private static final Logger LOGGER = LoggerFactory.getLogger(ReRegistrationDmaapConsumerJsonParser.class);
@@ -101,7 +103,7 @@
return Mono.empty();
}
- JsonObject pnfReRegistrationFields =
+ var pnfReRegistrationFields =
dmaapResponseJsonObject.getAsJsonObject(ADDITIONAL_FIELDS);
pnfCorrelationId = getValueFromJson(dmaapResponseJsonObject, CORRELATION_ID);
@@ -112,7 +114,7 @@
svlan = getValueFromJson(pnfReRegistrationFields, SVLAN);
if (StringUtils.isEmpty(pnfCorrelationId) || anyImportantPropertyMissing()) {
- String incorrectEvent = dumpJsonData();
+ var incorrectEvent = dumpJsonData();
LOGGER.warn("Incorrect Re-Registration JSON event: {}", incorrectEvent);
return Mono.empty();
}
@@ -148,7 +150,7 @@
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
+ var jsonParser = new JsonParser();
return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
: Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java
index 867cfda..bf90a4c 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java
@@ -27,18 +27,16 @@
import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
-public class ReRegistrationJsonBodyBuilder implements JsonBodyBuilder<ReRegistrationConsumerDmaapModel> {
+public class ReRegistrationJsonBodyBuilder {
/**
* Serialize the Re-Registration DMaaP model with GSON.
* @param reRegistrationConsumerDmaapModel object to be serialized
* @return String output of serialization
*/
- @Override
public String createJsonBody(ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel) {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
return gsonBuilder.create().toJson(ImmutableReRegistrationConsumerDmaapModel.builder()
.correlationId(reRegistrationConsumerDmaapModel.getCorrelationId())
diff --git a/components/bbs-event-processor/src/main/resources/application.yml b/components/bbs-event-processor/src/main/resources/application.yml
index 9092ada..ed99643 100644
--- a/components/bbs-event-processor/src/main/resources/application.yml
+++ b/components/bbs-event-processor/src/main/resources/application.yml
@@ -9,7 +9,7 @@
re-registration:
dmaapHostName: localhost
dmaapPortNumber: 2222
- dmaapTopicName: /events/unauthenticated.PNF_Update
+ dmaapTopicName: unauthenticated.PNF_Update
dmaapProtocol: http
dmaapContentType: application/json
consumerId: c12
@@ -19,7 +19,7 @@
cpe-authentication:
dmaapHostName: localhost
dmaapPortNumber: 2222
- dmaapTopicName: /events/unauthenticated.CPE_Authentication
+ dmaapTopicName: unauthenticated.CPE_Authentication
dmaapProtocol: http
dmaapContentType: application/json
consumerId: c12
@@ -29,7 +29,7 @@
producer:
dmaapHostName: localhost
dmaapPortNumber: 2223
- dmaapTopicName: /events/unauthenticated.DCAE_CL_OUTPUT
+ dmaapTopicName: unauthenticated.DCAE_CL_OUTPUT
dmaapProtocol: http
dmaapContentType: application/json
aai:
@@ -47,14 +47,14 @@
Real-Time: true
Content-Type: application/json
security:
- trustStorePath: change it
- trustStorePasswordPath: change it
- keyStorePath: change it
- keyStorePasswordPath: change it
+ trustStorePath: /opt/app/bbs-event-processor/etc/cert/trust.jks
+ trustStorePasswordPath: /opt/app/bbs-event-processor/etc/cert/trust.pass
+ keyStorePath: /opt/app/bbs-event-processor/etc/cert/cert.jks
+ keyStorePasswordPath: /opt/app/bbs-event-processor/etc/cert/jks.pass
enableAaiCertAuth: false
enableDmaapCertAuth: false
application:
- pipelinesPollingIntervalSec: 30
+ pipelinesPollingIntervalSec: 25
pipelinesTimeoutSec: 15
policyVersion: 1.0.0.5
clTargetType: VM
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
index 69fbb3f..2d9e49f 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
@@ -37,8 +37,6 @@
import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.context.SpringBootTest;
@@ -66,6 +64,8 @@
"configs.aai.client.aaiHeaders.Content-Type=application/merge-patch+json",
"configs.dmaap.consumer.re-registration.dmaapHostName=test localhost",
"configs.dmaap.consumer.re-registration.dmaapPortNumber=1234",
+ "configs.dmaap.consumer.re-registration.dmaapUserName=",
+ "configs.dmaap.consumer.re-registration.dmaapUserPassword=",
"configs.dmaap.consumer.re-registration.dmaapTopicName=/events/unauthenticated.PNF_REREGISTRATION",
"configs.dmaap.consumer.re-registration.dmaapProtocol=http",
"configs.dmaap.consumer.re-registration.dmaapContentType=application/json",
@@ -75,6 +75,8 @@
"configs.dmaap.consumer.re-registration.messageLimit=1",
"configs.dmaap.consumer.cpe-authentication.dmaapHostName=test localhost",
"configs.dmaap.consumer.cpe-authentication.dmaapPortNumber=1234",
+ "configs.dmaap.consumer.cpe-authentication.dmaapUserName=",
+ "configs.dmaap.consumer.cpe-authentication.dmaapUserPassword=",
"configs.dmaap.consumer.cpe-authentication.dmaapTopicName=/events/unauthenticated.CPE_AUTHENTICATION",
"configs.dmaap.consumer.cpe-authentication.dmaapProtocol=http",
"configs.dmaap.consumer.cpe-authentication.dmaapContentType=application/json",
@@ -84,13 +86,15 @@
"configs.dmaap.consumer.cpe-authentication.messageLimit=1",
"configs.dmaap.producer.dmaapHostName=test localhost",
"configs.dmaap.producer.dmaapPortNumber=1234",
+ "configs.dmaap.producer.dmaapUserName=",
+ "configs.dmaap.producer.dmaapUserPassword=",
"configs.dmaap.producer.dmaapTopicName=/events/unauthenticated.DCAE_CL_OUTPUT",
"configs.dmaap.producer.dmaapProtocol=http",
"configs.dmaap.producer.dmaapContentType=application/json",
- "configs.security.trustStorePath=test trust store path",
- "configs.security.trustStorePasswordPath=test trust store password path",
- "configs.security.keyStorePath=test key store path",
- "configs.security.keyStorePasswordPath=test key store password path",
+ "configs.security.trustStorePath=KeyStore.jks",
+ "configs.security.trustStorePasswordPath=KeyStorePass.txt",
+ "configs.security.keyStorePath=KeyStore.jks",
+ "configs.security.keyStorePasswordPath=KeyStorePass.txt",
"configs.security.enableDmaapCertAuth=false",
"configs.security.enableAaiCertAuth=false",
"configs.application.pipelinesPollingIntervalSec=30",
@@ -132,7 +136,7 @@
@Test
void testA_configurationObjectSuccessfullyPopulated() {
- AaiClientConfiguration aaiClientConfiguration = configuration.getAaiClientConfiguration();
+ var aaiClientConfiguration = configuration.getAaiClientConfiguration();
assertAll("AAI Client Configuration Properties",
() -> assertEquals("test localhost", aaiClientConfiguration.aaiHost()),
() -> assertEquals(Integer.valueOf(1234), aaiClientConfiguration.aaiPort()),
@@ -148,50 +152,64 @@
aaiClientConfiguration.aaiHeaders().get("Content-Type"))
);
- DmaapConsumerConfiguration dmaapConsumerReRegistrationConfig =
- configuration.getDmaapReRegistrationConsumerConfiguration();
assertAll("DMaaP Consumer Re-Registration Configuration Properties",
- () -> assertEquals("test localhost", dmaapConsumerReRegistrationConfig.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(1234), dmaapConsumerReRegistrationConfig.dmaapPortNumber()),
+ () -> assertEquals("test localhost",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName()),
+ () -> assertEquals(1234,
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber()),
() -> assertEquals("/events/unauthenticated.PNF_REREGISTRATION",
- dmaapConsumerReRegistrationConfig.dmaapTopicName()),
- () -> assertEquals("http", dmaapConsumerReRegistrationConfig.dmaapProtocol()),
- () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserName()),
- () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapConsumerReRegistrationConfig.dmaapContentType()),
- () -> assertEquals("c12", dmaapConsumerReRegistrationConfig.consumerId()),
- () -> assertEquals("OpenDcae-c12", dmaapConsumerReRegistrationConfig.consumerGroup()),
- () -> assertEquals(Integer.valueOf(-1), dmaapConsumerReRegistrationConfig.timeoutMs()),
- () -> assertEquals(Integer.valueOf(1), dmaapConsumerReRegistrationConfig.messageLimit())
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()),
+ () -> assertEquals("http",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol()),
+ () -> assertEquals("",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserName()),
+ () -> assertEquals("",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapContentType()),
+ () -> assertEquals("c12",
+ configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()),
+ () -> assertEquals("OpenDcae-c12",
+ configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup()),
+ () -> assertEquals(-1, configuration.getDmaapReRegistrationConsumerProperties().getTimeoutMs()),
+ () -> assertEquals(1, configuration.getDmaapReRegistrationConsumerProperties().getMessageLimit())
);
- DmaapConsumerConfiguration dmaapConsumerCpeAuthenticationConfig =
- configuration.getDmaapCpeAuthenticationConsumerConfiguration();
assertAll("DMaaP Consumer CPE Authentication Configuration Properties",
- () -> assertEquals("test localhost", dmaapConsumerCpeAuthenticationConfig.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(1234), dmaapConsumerCpeAuthenticationConfig.dmaapPortNumber()),
+ () -> assertEquals("test localhost",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName()),
+ () -> assertEquals(1234,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber()),
() -> assertEquals("/events/unauthenticated.CPE_AUTHENTICATION",
- dmaapConsumerCpeAuthenticationConfig.dmaapTopicName()),
- () -> assertEquals("http", dmaapConsumerCpeAuthenticationConfig.dmaapProtocol()),
- () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()),
- () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapConsumerCpeAuthenticationConfig.dmaapContentType()),
- () -> assertEquals("c12", dmaapConsumerCpeAuthenticationConfig.consumerId()),
- () -> assertEquals("OpenDcae-c12", dmaapConsumerCpeAuthenticationConfig.consumerGroup()),
- () -> assertEquals(Integer.valueOf(-1), dmaapConsumerCpeAuthenticationConfig.timeoutMs()),
- () -> assertEquals(Integer.valueOf(1), dmaapConsumerCpeAuthenticationConfig.messageLimit())
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()),
+ () -> assertEquals("http",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol()),
+ () -> assertEquals("",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserName()),
+ () -> assertEquals("",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapContentType()),
+ () -> assertEquals("c12",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()),
+ () -> assertEquals("OpenDcae-c12",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup()),
+ () -> assertEquals(-1, configuration.getDmaapCpeAuthenticationConsumerProperties().getTimeoutMs()),
+ () -> assertEquals(1,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getMessageLimit())
);
- DmaapPublisherConfiguration dmaapPublisherConfiguration = configuration.getDmaapPublisherConfiguration();
assertAll("DMaaP Publisher Configuration Properties",
- () -> assertEquals("test localhost", dmaapPublisherConfiguration.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(1234), dmaapPublisherConfiguration.dmaapPortNumber()),
+ () -> assertEquals("test localhost",
+ configuration.getDmaapProducerProperties().getDmaapHostName()),
+ () -> assertEquals(1234, configuration.getDmaapProducerProperties().getDmaapPortNumber()),
() -> assertEquals("/events/unauthenticated.DCAE_CL_OUTPUT",
- dmaapPublisherConfiguration.dmaapTopicName()),
- () -> assertEquals("http", dmaapPublisherConfiguration.dmaapProtocol()),
- () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserName()),
- () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapPublisherConfiguration.dmaapContentType())
+ configuration.getDmaapProducerProperties().getDmaapTopicName()),
+ () -> assertEquals("http", configuration.getDmaapProducerProperties().getDmaapProtocol()),
+ () -> assertEquals("", configuration.getDmaapProducerProperties().getDmaapUserName()),
+ () -> assertEquals("", configuration.getDmaapProducerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapProducerProperties().getDmaapContentType())
);
assertAll("Generic Application Properties",
@@ -211,12 +229,11 @@
assertAll("Security Application Properties",
() -> assertFalse(aaiClientConfiguration.enableAaiCertAuth()),
- () -> assertFalse(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()),
- () -> assertEquals("test key store path", aaiClientConfiguration.keyStorePath()),
- () -> assertEquals("test key store password path",
+ () -> assertEquals("KeyStore.jks", aaiClientConfiguration.keyStorePath()),
+ () -> assertEquals("KeyStorePass.txt",
aaiClientConfiguration.keyStorePasswordPath()),
- () -> assertEquals("test trust store path", aaiClientConfiguration.trustStorePath()),
- () -> assertEquals("test trust store password path",
+ () -> assertEquals("KeyStore.jks", aaiClientConfiguration.trustStorePath()),
+ () -> assertEquals("KeyStorePass.txt",
aaiClientConfiguration.trustStorePasswordPath())
);
}
@@ -298,10 +315,10 @@
.cpeAuthConfigKey("config_key_2")
.closeLoopConfigKey("config_key_3")
.loggingLevel("TRACE")
- .keyStorePath("test key store path - update")
- .keyStorePasswordPath("test key store password path - update")
- .trustStorePath("test trust store path - update")
- .trustStorePasswordPath("test trust store password path - update")
+ .keyStorePath("KeyStore-update.jks")
+ .keyStorePasswordPath("KeyStorePass-update.txt")
+ .trustStorePath("KeyStore-update.jks")
+ .trustStorePasswordPath("KeyStorePass-update.txt")
.enableAaiCertAuth(true)
.enableDmaapCertAuth(true)
.streamSubscribesMap(subscribes)
@@ -311,7 +328,7 @@
// Update the configuration
configuration.updateCurrentConfiguration(updatedConfiguration);
- AaiClientConfiguration aaiClientConfiguration = configuration.getAaiClientConfiguration();
+ var aaiClientConfiguration = configuration.getAaiClientConfiguration();
assertAll("AAI Client Configuration Properties",
() -> assertEquals("aai.onap.svc.cluster.local", aaiClientConfiguration.aaiHost()),
() -> assertEquals(Integer.valueOf(8443), aaiClientConfiguration.aaiPort()),
@@ -327,50 +344,65 @@
aaiClientConfiguration.aaiHeaders().get("Content-Type"))
);
- DmaapConsumerConfiguration dmaapConsumerReRegistrationConfig =
- configuration.getDmaapReRegistrationConsumerConfiguration();
assertAll("DMaaP Consumer Re-Registration Configuration Properties",
- () -> assertEquals("we-are-message-router1.us", dmaapConsumerReRegistrationConfig.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(3901), dmaapConsumerReRegistrationConfig.dmaapPortNumber()),
- () -> assertEquals("events/unauthenticated.PNF_UPDATE",
- dmaapConsumerReRegistrationConfig.dmaapTopicName()),
- () -> assertEquals("https", dmaapConsumerReRegistrationConfig.dmaapProtocol()),
- () -> assertEquals("some-user", dmaapConsumerReRegistrationConfig.dmaapUserName()),
- () -> assertEquals("some-password", dmaapConsumerReRegistrationConfig.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapConsumerReRegistrationConfig.dmaapContentType()),
- () -> assertEquals("c13", dmaapConsumerReRegistrationConfig.consumerId()),
- () -> assertEquals("OpenDcae-c13", dmaapConsumerReRegistrationConfig.consumerGroup()),
- () -> assertEquals(Integer.valueOf(5), dmaapConsumerReRegistrationConfig.timeoutMs()),
- () -> assertEquals(Integer.valueOf(10), dmaapConsumerReRegistrationConfig.messageLimit())
+ () -> assertEquals("we-are-message-router1.us",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName()),
+ () -> assertEquals(3901,
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber()),
+ () -> assertEquals("unauthenticated.PNF_UPDATE",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()),
+ () -> assertEquals("https",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol()),
+ () -> assertEquals("some-user",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserName()),
+ () -> assertEquals("some-password",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapContentType()),
+ () -> assertEquals("c13",
+ configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()),
+ () -> assertEquals("OpenDcae-c13",
+ configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup()),
+ () -> assertEquals(5, configuration.getDmaapReRegistrationConsumerProperties().getTimeoutMs()),
+ () -> assertEquals(10, configuration.getDmaapReRegistrationConsumerProperties().getMessageLimit())
);
- DmaapConsumerConfiguration dmaapConsumerCpeAuthenticationConfig =
- configuration.getDmaapCpeAuthenticationConsumerConfiguration();
assertAll("DMaaP Consumer CPE Authentication Configuration Properties",
- () -> assertEquals("we-are-message-router2.us", dmaapConsumerCpeAuthenticationConfig.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(3902), dmaapConsumerCpeAuthenticationConfig.dmaapPortNumber()),
- () -> assertEquals("events/unauthenticated.CPE_AUTHENTICATION",
- dmaapConsumerCpeAuthenticationConfig.dmaapTopicName()),
- () -> assertEquals("https", dmaapConsumerCpeAuthenticationConfig.dmaapProtocol()),
- () -> assertEquals("some-user", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()),
- () -> assertEquals("some-password", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapConsumerCpeAuthenticationConfig.dmaapContentType()),
- () -> assertEquals("c13", dmaapConsumerCpeAuthenticationConfig.consumerId()),
- () -> assertEquals("OpenDcae-c13", dmaapConsumerCpeAuthenticationConfig.consumerGroup()),
- () -> assertEquals(Integer.valueOf(5), dmaapConsumerCpeAuthenticationConfig.timeoutMs()),
- () -> assertEquals(Integer.valueOf(10), dmaapConsumerCpeAuthenticationConfig.messageLimit())
+ () -> assertEquals("we-are-message-router2.us",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName()),
+ () -> assertEquals(3902,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber()),
+ () -> assertEquals("unauthenticated.CPE_AUTHENTICATION",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()),
+ () -> assertEquals("https",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol()),
+ () -> assertEquals("some-user",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserName()),
+ () -> assertEquals("some-password",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapContentType()),
+ () -> assertEquals("c13",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()),
+ () -> assertEquals("OpenDcae-c13",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup()),
+ () -> assertEquals(5, configuration.getDmaapCpeAuthenticationConsumerProperties().getTimeoutMs()),
+ () -> assertEquals(10,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getMessageLimit())
);
- DmaapPublisherConfiguration dmaapPublisherConfiguration = configuration.getDmaapPublisherConfiguration();
assertAll("DMaaP Publisher Configuration Properties",
- () -> assertEquals("we-are-message-router3.us", dmaapPublisherConfiguration.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(3903), dmaapPublisherConfiguration.dmaapPortNumber()),
- () -> assertEquals("events/unauthenticated.DCAE_CL_OUTPUT",
- dmaapPublisherConfiguration.dmaapTopicName()),
- () -> assertEquals("https", dmaapPublisherConfiguration.dmaapProtocol()),
- () -> assertEquals("some-user", dmaapPublisherConfiguration.dmaapUserName()),
- () -> assertEquals("some-password", dmaapPublisherConfiguration.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapPublisherConfiguration.dmaapContentType())
+ () -> assertEquals("we-are-message-router3.us",
+ configuration.getDmaapProducerProperties().getDmaapHostName()),
+ () -> assertEquals(3903, configuration.getDmaapProducerProperties().getDmaapPortNumber()),
+ () -> assertEquals("unauthenticated.DCAE_CL_OUTPUT",
+ configuration.getDmaapProducerProperties().getDmaapTopicName()),
+ () -> assertEquals("https", configuration.getDmaapProducerProperties().getDmaapProtocol()),
+ () -> assertEquals("some-user", configuration.getDmaapProducerProperties().getDmaapUserName()),
+ () -> assertEquals("some-password",
+ configuration.getDmaapProducerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapProducerProperties().getDmaapContentType())
);
assertAll("Generic Application Properties",
@@ -391,12 +423,11 @@
assertAll("Security Application Properties",
() -> assertTrue(aaiClientConfiguration.enableAaiCertAuth()),
- () -> assertTrue(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()),
- () -> assertEquals("test key store path - update", aaiClientConfiguration.keyStorePath()),
- () -> assertEquals("test key store password path - update",
+ () -> assertEquals("KeyStore-update.jks", aaiClientConfiguration.keyStorePath()),
+ () -> assertEquals("KeyStorePass-update.txt",
aaiClientConfiguration.keyStorePasswordPath()),
- () -> assertEquals("test trust store path - update", aaiClientConfiguration.trustStorePath()),
- () -> assertEquals("test trust store password path - update",
+ () -> assertEquals("KeyStore-update.jks", aaiClientConfiguration.trustStorePath()),
+ () -> assertEquals("KeyStorePass-update.txt",
aaiClientConfiguration.trustStorePasswordPath())
);
}
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
index 1acf864..1d1bce2 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
@@ -48,13 +48,13 @@
}
ConsulConfigurationGatewayTest() {
- ApplicationConfiguration configuration = Mockito.mock(ApplicationConfiguration.class);
+ var configuration = Mockito.mock(ApplicationConfiguration.class);
this.configurationGateway = new ConsulConfigurationGateway(configuration);
}
@Test
void passingValidJson_constructsGeneratedAppConfigObject() {
- final String validJson = "{"
+ final var validJson = "{"
+ "\"dmaap.protocol\": \"http\","
+ "\"dmaap.contentType\": \"application/json\","
+ "\"dmaap.consumer.consumerId\": \"c12\","
@@ -219,7 +219,7 @@
.streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3))
.build();
- ConsulConfigurationGateway spiedGateway = Mockito.spy(configurationGateway);
+ var spiedGateway = Mockito.spy(configurationGateway);
doReturn(false).when(spiedGateway).environmentNotReady();
assertEquals(expectedConfiguration,
spiedGateway.generateAppConfigObject(jsonParser.parse(validJson).getAsJsonObject()));
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java
index bacb6c3..8cc7d5c 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java
@@ -42,7 +42,6 @@
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.test.web.servlet.MockMvc;
-import org.springframework.test.web.servlet.MvcResult;
@WebMvcTest(BbsEventProcessorController.class)
@DisplayName("BBS Event Processor Controllers MVC Unit-Tests")
@@ -67,7 +66,7 @@
@Test
void sendingHeartBeatRestCall_RespondsWithAlive() throws Exception {
- MvcResult heartBeatResult = mockMvc.perform(get("/heartbeat")).andReturn();
+ var heartBeatResult = mockMvc.perform(get("/heartbeat")).andReturn();
mockMvc.perform(asyncDispatch(heartBeatResult))
.andExpect(status().isOk())
@@ -76,7 +75,7 @@
@Test
void sendingReRegistrationSubmissionRestCall_RespondsWithOk() throws Exception {
- MvcResult reregistrationSubmissionResult = mockMvc.perform(post("/poll-reregistration-events")).andReturn();
+ var reregistrationSubmissionResult = mockMvc.perform(post("/poll-reregistration-events")).andReturn();
mockMvc.perform(asyncDispatch(reregistrationSubmissionResult))
.andExpect(status().isOk())
@@ -86,7 +85,7 @@
@Test
void sendingCpeAuthenticationSubmissionRestCall_RespondsWithOk() throws Exception {
- MvcResult reregistrationSubmissionResult = mockMvc.perform(post("/poll-cpe-authentication-events")).andReturn();
+ var reregistrationSubmissionResult = mockMvc.perform(post("/poll-cpe-authentication-events")).andReturn();
mockMvc.perform(asyncDispatch(reregistrationSubmissionResult))
.andExpect(status().isOk())
@@ -97,7 +96,7 @@
@Test
void sendingStartTasksRestCall_ifItSucceeds_RespondsWithOk() throws Exception {
when(scheduler.reScheduleProcessingTasks()).thenReturn(true);
- MvcResult startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn();
+ var startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn();
mockMvc.perform(asyncDispatch(startTasksResult))
.andExpect(status().isOk())
@@ -108,7 +107,7 @@
@Test
void sendingStartTasksRestCall_ifItFails_RespondsWithNotAcceptable() throws Exception {
when(scheduler.reScheduleProcessingTasks()).thenReturn(false);
- MvcResult startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn();
+ var startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn();
mockMvc.perform(asyncDispatch(startTasksResult))
.andExpect(status().isNotAcceptable())
@@ -119,7 +118,7 @@
@Test
void sendingCancelTasksRestCall_ifItSucceeds_RespondsWithOk() throws Exception {
when(scheduler.cancelScheduledProcessingTasks()).thenReturn(true);
- MvcResult cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn();
+ var cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn();
mockMvc.perform(asyncDispatch(cancellationResult))
.andExpect(status().isOk())
@@ -130,7 +129,7 @@
@Test
void sendingCancelTasksRestCall_ifItFails_RespondsWithNotAcceptable() throws Exception {
when(scheduler.cancelScheduledProcessingTasks()).thenReturn(false);
- MvcResult cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn();
+ var cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn();
mockMvc.perform(asyncDispatch(cancellationResult))
.andExpect(status().isNotAcceptable())
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java
index fd43b8b..30cc4ab 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java
@@ -22,7 +22,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
-import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapterFactory;
@@ -42,13 +41,13 @@
@Test
void creatingReRegistrationJsonBody_returnsJsonInString() {
- String correlationId = "NokiaCorrelationId";
- String attachmentPoint = "olt2/1/1";
- String remoteId = "RemoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var correlationId = "NokiaCorrelationId";
+ var attachmentPoint = "olt2/1/1";
+ var remoteId = "RemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String template = "{"
+ var template = "{"
+ "\"correlationId\":\"%s\","
+ "\"attachment-point\":\"%s\","
+ "\"remote-id\":\"%s\","
@@ -65,7 +64,7 @@
.build();
- String expectedResult = String.format(template, correlationId, attachmentPoint, remoteId, cvlan, svlan);
+ var expectedResult = String.format(template, correlationId, attachmentPoint, remoteId, cvlan, svlan);
assertEquals(expectedResult, new ReRegistrationJsonBodyBuilder().createJsonBody(model));
}
@@ -73,14 +72,14 @@
@Test
void creatingCpeAuthenticationJsonBody_returnsJsonInString() {
- String correlationId = "NokiaCorrelationID";
- AuthenticationState oldAuthenticationState = AuthenticationState.IN_SERVICE;
- AuthenticationState newAuthenticationState = AuthenticationState.OUT_OF_SERVICE;
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var correlationId = "NokiaCorrelationID";
+ var oldAuthenticationState = AuthenticationState.IN_SERVICE;
+ var newAuthenticationState = AuthenticationState.OUT_OF_SERVICE;
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String template = "{"
+ var template = "{"
+ "\"correlationId\":\"%s\","
+ "\"old-authentication-state\":\"%s\","
+ "\"new-authentication-state\":\"%s\","
@@ -99,7 +98,7 @@
.build();
- String expectedResult = String.format(template, correlationId, oldAuthenticationState.getNameInOnap(),
+ var expectedResult = String.format(template, correlationId, oldAuthenticationState.getNameInOnap(),
newAuthenticationState.getNameInOnap(), stateInterface, rgwMacAddress, swVersion);
assertEquals(expectedResult, new CpeAuthenticationJsonBodyBuilder().createJsonBody(model));
@@ -108,27 +107,27 @@
@Test
void creatingDcaeControlLoopJsonBody_returnsJsonInString() {
- String closedLoopEventClient = "DCAE.BBS_mSInstance";
- String policyVersion = "1.0.0.5";
- String policyName = "CPE_Authentication";
- String policyScope =
+ var closedLoopEventClient = "DCAE.BBS_mSInstance";
+ var policyVersion = "1.0.0.5";
+ var policyName = "CPE_Authentication";
+ var policyScope =
"service=HSIAService,type=SampleType,"
+ "closedLoopControlName=CL-CPE_A-d925ed73-8231-4d02-9545-db4e101f88f8";
- String targetType = "VM";
- long closedLoopAlarmStart = 1484677482204798L;
- String closedLoopEventStatus = "ONSET";
- String closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b";
- String version = "1.0.2";
- String target = "vserver.vserver-name";
- String requestId = "97964e10-686e-4790-8c45-bdfa61df770f";
- String from = "DCAE";
+ var targetType = "VM";
+ var closedLoopAlarmStart = 1484677482204798L;
+ var closedLoopEventStatus = "ONSET";
+ var closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b";
+ var version = "1.0.2";
+ var target = "vserver.vserver-name";
+ var requestId = "97964e10-686e-4790-8c45-bdfa61df770f";
+ var from = "DCAE";
Map<String, String> aaiEnrichmentData = new LinkedHashMap<>();
aaiEnrichmentData.put("service-information.service-instance-id", "service-instance-id-example");
aaiEnrichmentData.put("cvlan-id", "example cvlan-id");
aaiEnrichmentData.put("svlan-id", "example svlan-id");
- String template = "{"
+ var template = "{"
+ "\"closedLoopEventClient\":\"%s\","
+ "\"policyVersion\":\"%s\","
+ "\"policyName\":\"%s\","
@@ -165,7 +164,7 @@
.originator(from)
.build();
- String expectedResult = String.format(template,
+ var expectedResult = String.format(template,
closedLoopEventClient,
policyVersion,
policyName,
@@ -185,14 +184,14 @@
@Test
void pnfAaiObject_IsSerializedSuccessfully() {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- Gson gson = gsonBuilder.create();
+ var gson = gsonBuilder.create();
- String pnfName = "NokiaCorrelationID";
- String swVersion = "1.2";
+ var pnfName = "NokiaCorrelationID";
+ var swVersion = "1.2";
- String template = "{"
+ var template = "{"
+ "\"pnf-name\":\"%s\","
+ "\"in-maint\":true,"
+ "\"sw-version\":\"%s\","
@@ -288,7 +287,7 @@
.build();
- String jsonPnfObject = String.format(template, pnfName, swVersion);
+ var jsonPnfObject = String.format(template, pnfName, swVersion);
assertEquals(jsonPnfObject, gson.toJson(pnfAaiObject));
assertEquals(pnfAaiObject, gson.fromJson(jsonPnfObject, ImmutablePnfAaiObject.class));
@@ -297,14 +296,14 @@
@Test
void serviceInstanceAaiObject_IsSerializedSuccessfully() {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- Gson gson = gsonBuilder.create();
+ var gson = gsonBuilder.create();
- String serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
- String orchestrationStatus = "active";
+ var serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
+ var orchestrationStatus = "active";
- String template = "{"
+ var template = "{"
+ "\"service-instance-id\":\"%s\","
+ "\"orchestration-status\":\"%s\","
+ "\"relationship-list\":{"
@@ -370,7 +369,7 @@
.build();
- String jsonServiceInstanceObject = String.format(template, serviceInstanceId, orchestrationStatus);
+ var jsonServiceInstanceObject = String.format(template, serviceInstanceId, orchestrationStatus);
assertEquals(jsonServiceInstanceObject, gson.toJson(serviceInstanceAaiObject));
assertEquals(serviceInstanceAaiObject, gson.fromJson(jsonServiceInstanceObject,
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
index c4bef9d..50fcdef 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
@@ -20,7 +20,7 @@
package org.onap.bbs.event.processor.pipelines;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
@@ -64,8 +64,7 @@
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.springframework.http.HttpStatus;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -80,12 +79,12 @@
private DmaapPublisherTask publisherTask;
private AaiClientTask aaiClientTask;
- private HttpResponse httpResponse;
+ private MessageRouterPublishResponse publishResponse;
@BeforeEach
void setup() {
- httpResponse = Mockito.mock(HttpResponse.class);
+ publishResponse = Mockito.mock(MessageRouterPublishResponse.class);
configuration = Mockito.mock(ApplicationConfiguration.class);
consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
@@ -148,12 +147,12 @@
@Test
void noResponseFromAai_PipelineTimesOut() throws SSLException {
- String pnfName = "olt1";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
+ var pnfName = "olt1";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -181,13 +180,13 @@
@Test
void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
- String pnfName = "olt1";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName = "olt1";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -199,12 +198,12 @@
.swVersion(swVersion)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
+ var pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
@@ -217,7 +216,7 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.never());
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
@@ -230,13 +229,13 @@
@Test
void singleCorrectEvent_handleSuccessfully() throws SSLException {
- String pnfName = "olt1";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName = "olt1";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -248,12 +247,12 @@
.swVersion(swVersion)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
+ var pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -266,13 +265,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -281,16 +280,16 @@
@Test
void twoCorrectEvents_handleSuccessfully() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress1 = "00:0a:95:8d:78:16";
- final String rgwMacAddress2 = "00:0a:95:8d:78:17";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
- String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress1 = "00:0a:95:8d:78:16";
+ var rgwMacAddress2 = "00:0a:95:8d:78:17";
+ var swVersion = "1.2";
+ var hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
+ var hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -310,19 +309,19 @@
.swVersion(swVersion)
.build();
- PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
- PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
- ServiceInstanceAaiObject hsiCfsServiceInstance1 =
+ var pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
+ var pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
+ var hsiCfsServiceInstance1 =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
- ServiceInstanceAaiObject hsiCfsServiceInstance2 =
+ var hsiCfsServiceInstance2 =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, rgwMacAddress2);
// Prepare Mocks
- String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
- String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
- String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
+ var pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
+ var cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance1.getServiceInstanceId());
- String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance2.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -339,14 +338,14 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -355,12 +354,12 @@
@Test
void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
- String pnfName = "olt1";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
+ var pnfName = "olt1";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -389,90 +388,16 @@
}
@Test
- void twoEvents_FirstOk_SecondUnmatchedMac_handleCorrectOnly() throws SSLException {
-
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress1 = "00:0a:95:8d:78:16";
- final String rgwMacAddress2 = "00:0a:95:8d:78:17";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
- String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
-
- // Prepare stubbed replies
- CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
- .correlationId(pnfName1)
- .oldAuthenticationState(oldAuthenticationState)
- .newAuthenticationState(newAuthenticationState)
- .stateInterface(stateInterface)
- .rgwMacAddress(rgwMacAddress1)
- .swVersion(swVersion)
- .build();
- CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
- .correlationId(pnfName2)
- .oldAuthenticationState(oldAuthenticationState)
- .newAuthenticationState(newAuthenticationState)
- .stateInterface(stateInterface)
- .rgwMacAddress(rgwMacAddress2)
- .swVersion(swVersion)
- .build();
-
- PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
- PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
- ServiceInstanceAaiObject hsiCfsServiceInstance1 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
- ServiceInstanceAaiObject hsiCfsServiceInstance2 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2,
- "Having unmatched RGW MAC address");
-
- // Prepare Mocks
- String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
- String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
- String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
- hsiCfsServiceInstance1.getServiceInstanceId());
- String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
- hsiCfsServiceInstance2.getServiceInstanceId());
-
- when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
- when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
- .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
-
- when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
- when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
-
- when(aaiClientTask
- .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
- .thenReturn(Mono.just(hsiCfsServiceInstance1));
- when(aaiClientTask
- .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
- .thenReturn(Mono.just(hsiCfsServiceInstance2));
-
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
-
- // Execute the pipeline
- StepVerifier.create(pipeline.executePipeline())
- .expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
- .verifyComplete();
-
- verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
- }
-
- @Test
void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -492,12 +417,12 @@
.swVersion(swVersion)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
+ var pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, rgwMacAddress);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -510,13 +435,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
@@ -527,14 +452,14 @@
@Test
void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -554,12 +479,12 @@
.swVersion(swVersion)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
+ var pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, rgwMacAddress);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -572,13 +497,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(aaiClientTask, times(2))
@@ -630,7 +555,7 @@
private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
String pnfName,
String rgwMacAddress) {
- String orchestrationStatus = "active";
+ var orchestrationStatus = "active";
RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
ImmutableRelationshipEntryAaiObject.builder()
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
index 9453db3..815cb5c 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
@@ -20,7 +20,7 @@
package org.onap.bbs.event.processor.pipelines;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
@@ -64,8 +64,7 @@
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.springframework.http.HttpStatus;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -80,12 +79,12 @@
private DmaapPublisherTask publisherTask;
private AaiClientTask aaiClientTask;
- private HttpResponse httpResponse;
+ private MessageRouterPublishResponse publishResponse;
@BeforeEach
void setup() {
- httpResponse = Mockito.mock(HttpResponse.class);
+ publishResponse = Mockito.mock(MessageRouterPublishResponse.class);
configuration = Mockito.mock(ApplicationConfiguration.class);
consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class);
@@ -148,11 +147,11 @@
@Test
void noResponseFromAai_PipelineTimesOut() throws SSLException {
- String pnfName = "olt1";
- String attachmentPoint = "olt2-2-2";
- String remoteId = "newRemoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var pnfName = "olt1";
+ var attachmentPoint = "olt2-2-2";
+ var remoteId = "newRemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -179,12 +178,12 @@
@Test
void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
- String pnfName = "olt1";
- String attachmentPoint = "olt2-2-2";
- String remoteId = "newRemoteId";
- String cvlan = "1005";
- String svlan = "100";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName = "olt1";
+ var attachmentPoint = "olt2-2-2";
+ var remoteId = "newRemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -195,12 +194,11 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "olt1-1-1", hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
+ var pnfAaiObject = constructPnfObject(pnfName, "olt1-1-1", hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
@@ -213,7 +211,7 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.never());
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
@@ -226,12 +224,12 @@
@Test
void singleCorrectEvent_PnfHavingNoLogicalLink_handleGracefully() throws SSLException {
- String pnfName = "olt1";
- String attachmentPoint = "olt2-2-2";
- String remoteId = "newRemoteId";
- String cvlan = "1005";
- String svlan = "100";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName = "olt1";
+ var attachmentPoint = "olt2-2-2";
+ var remoteId = "newRemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -242,12 +240,12 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObjectWithoutLogicalLink(pnfName, hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
+ var pnfAaiObject = constructPnfObjectWithoutLogicalLink(pnfName, hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -260,8 +258,8 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
@@ -276,12 +274,12 @@
@Test
void singleCorrectEvent_handleSuccessfully() throws SSLException {
- String pnfName = "olt1";
- String attachmentPoint = "olt2-2-2";
- String remoteId = "newRemoteId";
- String cvlan = "1005";
- String svlan = "100";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName = "olt1";
+ var attachmentPoint = "olt2-2-2";
+ var remoteId = "newRemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -292,12 +290,11 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "old-attachment-point", hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
+ var pnfAaiObject = constructPnfObject(pnfName, "old-attachment-point", hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -310,13 +307,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -325,17 +322,17 @@
@Test
void twoCorrectEvents_handleSuccessfully() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- String attachmentPoint1 = "olt1-1-1";
- String attachmentPoint2 = "olt2-2-2";
- String remoteId1 = "newRemoteId1";
- String remoteId2 = "newRemoteId2";
- String cvlan1 = "1005";
- String cvlan2 = "1006";
- String svlan = "100";
- String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
- String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var attachmentPoint1 = "olt1-1-1";
+ var attachmentPoint2 = "olt2-2-2";
+ var remoteId1 = "newRemoteId1";
+ var remoteId2 = "newRemoteId2";
+ var cvlan1 = "1005";
+ var cvlan2 = "1006";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
+ var hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -353,19 +350,17 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
- PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, "olt2-2-0", hsiCfsServiceInstanceId2);
- ServiceInstanceAaiObject hsiCfsServiceInstance1 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
- ServiceInstanceAaiObject hsiCfsServiceInstance2 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
+ var pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
+ var pnfAaiObject2 = constructPnfObject(pnfName2, "olt2-2-0", hsiCfsServiceInstanceId2);
+ var hsiCfsServiceInstance1 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
+ var hsiCfsServiceInstance2 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
// Prepare Mocks
- String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
- String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
- String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
+ var pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
+ var cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance1.getServiceInstanceId());
- String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance2.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -382,14 +377,14 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -398,11 +393,11 @@
@Test
void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
- String pnfName = "olt1";
- String attachmentPoint = "olt2-2-2";
- String remoteId = "newRemoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var pnfName = "olt1";
+ var attachmentPoint = "olt2-2-2";
+ var remoteId = "newRemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -432,17 +427,17 @@
@Test
void twoEvents_FirstOk_SecondNotRelocation_handleCorrectOnly() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- String attachmentPoint1 = "olt1-1-1";
- String attachmentPoint2 = "olt2-2-2";
- String remoteId1 = "newRemoteId1";
- String remoteId2 = "newRemoteId2";
- String cvlan1 = "1005";
- String cvlan2 = "1006";
- String svlan = "100";
- String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
- String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var attachmentPoint1 = "olt1-1-1";
+ var attachmentPoint2 = "olt2-2-2";
+ var remoteId1 = "newRemoteId1";
+ var remoteId2 = "newRemoteId2";
+ var cvlan1 = "1005";
+ var cvlan2 = "1006";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
+ var hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -460,19 +455,17 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
- PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, attachmentPoint2, hsiCfsServiceInstanceId2);
- ServiceInstanceAaiObject hsiCfsServiceInstance1 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
- ServiceInstanceAaiObject hsiCfsServiceInstance2 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
+ var pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
+ var pnfAaiObject2 = constructPnfObject(pnfName2, attachmentPoint2, hsiCfsServiceInstanceId2);
+ var hsiCfsServiceInstance1 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
+ var hsiCfsServiceInstance2 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
// Prepare Mocks
- String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
- String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
- String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
+ var pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
+ var cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance1.getServiceInstanceId());
- String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance2.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -489,13 +482,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -504,16 +497,16 @@
@Test
void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- String attachmentPoint1 = "olt1-1-1";
- String attachmentPoint2 = "olt2-2-2";
- String remoteId1 = "newRemoteId1";
- String remoteId2 = "newRemoteId2";
- String cvlan1 = "1005";
- String cvlan2 = "1006";
- String svlan = "100";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var attachmentPoint1 = "olt1-1-1";
+ var attachmentPoint2 = "olt2-2-2";
+ var remoteId1 = "newRemoteId1";
+ var remoteId2 = "newRemoteId2";
+ var cvlan1 = "1005";
+ var cvlan2 = "1006";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -531,12 +524,11 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, "old-attachment-point", hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, cvlan1);
+ var pnfAaiObject = constructPnfObject(pnfName1, "old-attachment-point", hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, cvlan1);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -549,13 +541,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
@@ -566,16 +558,16 @@
@Test
void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- String attachmentPoint1 = "olt1-1-1";
- String attachmentPoint2 = "olt2-2-2";
- String remoteId1 = "newRemoteId1";
- String remoteId2 = "newRemoteId2";
- String cvlan1 = "1005";
- String cvlan2 = "1006";
- String svlan = "100";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var attachmentPoint1 = "olt1-1-1";
+ var attachmentPoint2 = "olt2-2-2";
+ var remoteId1 = "newRemoteId1";
+ var remoteId2 = "newRemoteId2";
+ var cvlan1 = "1005";
+ var cvlan2 = "1006";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -593,12 +585,11 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, "old-attachment-point", hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, cvlan2);
+ var pnfAaiObject = constructPnfObject(pnfName2, "old-attachment-point", hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, cvlan2);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -611,13 +602,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(aaiClientTask, times(2))
@@ -719,7 +710,7 @@
private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
String pnfName,
String cvlan) {
- String orchestrationStatus = "active";
+ var orchestrationStatus = "active";
RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
ImmutableRelationshipEntryAaiObject.builder()
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java
index f721ca7..b037e2c 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java
@@ -50,9 +50,9 @@
SchedulerTest() {
configuration = Mockito.mock(ApplicationConfiguration.class);
taskScheduler = Mockito.mock(TaskScheduler.class);
- ReRegistrationPipeline reRegistrationPipeline = Mockito.mock(ReRegistrationPipeline.class);
- CpeAuthenticationPipeline cpeAuthenticationPipeline = Mockito.mock(CpeAuthenticationPipeline.class);
- ConsulConfigurationGateway configurationGateway = Mockito.mock(ConsulConfigurationGateway.class);
+ var reRegistrationPipeline = Mockito.mock(ReRegistrationPipeline.class);
+ var cpeAuthenticationPipeline = Mockito.mock(CpeAuthenticationPipeline.class);
+ var configurationGateway = Mockito.mock(ConsulConfigurationGateway.class);
this.applicationScheduler = new Scheduler(configuration, configurationGateway, taskScheduler,
reRegistrationPipeline, cpeAuthenticationPipeline);
}
@@ -60,7 +60,7 @@
@Test
void scheduleTasksWithValidSchedulingPeriod_Succeeds() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
- ScheduledFuture scheduledFuture = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture = Mockito.mock(ScheduledFuture.class);
when(taskScheduler.scheduleAtFixedRate(any(Runnable.class), any(Instant.class), any(Duration.class)))
.thenReturn(scheduledFuture);
@@ -75,8 +75,8 @@
@Test
void cancellingRunningTasksSucceeds_tasksAreDeleted() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
- ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
when(scheduledFuture1.cancel(false)).thenReturn(true);
when(scheduledFuture2.cancel(false)).thenReturn(true);
when(scheduledFuture1.isCancelled()).thenReturn(true);
@@ -85,7 +85,7 @@
.thenReturn(scheduledFuture1).thenReturn(scheduledFuture2);
applicationScheduler.setupScheduler();
- boolean result = applicationScheduler.cancelScheduledProcessingTasks();
+ var result = applicationScheduler.cancelScheduledProcessingTasks();
assertAll("Successfully cancelling tasks",
() -> assertTrue(result, "Result of cancellation task"),
() -> assertEquals(0, applicationScheduler.numberOfTotalTasks(), "Total tasks"),
@@ -97,8 +97,8 @@
@Test
void cancellingRunningTasksPartiallyFailing_tasksAreNotDeleted() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
- ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
when(scheduledFuture1.cancel(false)).thenReturn(true);
when(scheduledFuture2.cancel(false)).thenReturn(false);
when(scheduledFuture1.isCancelled()).thenReturn(true);
@@ -107,7 +107,7 @@
.thenReturn(scheduledFuture1).thenReturn(scheduledFuture2);
applicationScheduler.setupScheduler();
- boolean result = applicationScheduler.cancelScheduledProcessingTasks();
+ var result = applicationScheduler.cancelScheduledProcessingTasks();
assertAll("Partially cancelling tasks",
() -> assertFalse(result, "Result of cancellation task"),
() -> assertEquals(1, applicationScheduler.numberOfTotalTasks(), "Total tasks"),
@@ -119,8 +119,8 @@
@Test
void cancellingRunningTasksFailingForAllOfThem_noTasksAreDeleted() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
- ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
when(scheduledFuture1.cancel(false)).thenReturn(false);
when(scheduledFuture2.cancel(false)).thenReturn(false);
when(scheduledFuture1.isCancelled()).thenReturn(false);
@@ -129,7 +129,7 @@
.thenReturn(scheduledFuture1).thenReturn(scheduledFuture2);
applicationScheduler.setupScheduler();
- boolean result = applicationScheduler.cancelScheduledProcessingTasks();
+ var result = applicationScheduler.cancelScheduledProcessingTasks();
assertAll("Failing in cancelling tasks",
() -> assertFalse(result, "Result of cancellation task"),
() -> assertEquals(2, applicationScheduler.numberOfTotalTasks(), "Total tasks"),
@@ -141,15 +141,15 @@
@Test
void reSchedulingWithExistingActiveTasks_Fails() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
- ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
when(scheduledFuture1.isCancelled()).thenReturn(false);
when(scheduledFuture2.isCancelled()).thenReturn(false);
when(taskScheduler.scheduleAtFixedRate(any(Runnable.class), any(Instant.class), any(Duration.class)))
.thenReturn(scheduledFuture1).thenReturn(scheduledFuture2);
applicationScheduler.setupScheduler();
- boolean result = applicationScheduler.reScheduleProcessingTasks();
+ var result = applicationScheduler.reScheduleProcessingTasks();
assertAll("Rescheduling with active tasks",
() -> assertFalse(result, "Result of re-scheduling"),
() -> assertEquals(2, applicationScheduler.numberOfTotalTasks(), "Total tasks"),
@@ -162,11 +162,11 @@
void reSchedulingWithExistingCancelledTasks_Succeeds() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
// Initial tasks
- ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
// Re-scheduled tasks
- ScheduledFuture scheduledFuture3 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture4 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture3 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture4 = Mockito.mock(ScheduledFuture.class);
when(scheduledFuture1.isCancelled()).thenReturn(true);
when(scheduledFuture2.isCancelled()).thenReturn(true);
when(scheduledFuture3.isCancelled()).thenReturn(false);
@@ -178,7 +178,7 @@
.thenReturn(scheduledFuture4);
applicationScheduler.setupScheduler();
- boolean result = applicationScheduler.reScheduleProcessingTasks();
+ var result = applicationScheduler.reScheduleProcessingTasks();
assertAll("Rescheduling with cancelled tasks",
() -> assertTrue(result, "Result of re-scheduling"),
() -> assertEquals(2, applicationScheduler.numberOfTotalTasks(), "Total tasks"),
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java
index db5f7cb..e13b8bc 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java
@@ -64,7 +64,7 @@
@BeforeEach
void init() {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
reactiveClient = Mockito.mock(AaiReactiveClient.class);
task = new AaiClientTaskImpl(reactiveClient);
@@ -73,7 +73,7 @@
@Test
void passingEmptyPnfObject_NothingHappens() throws AaiTaskException {
when(reactiveClient.getPnfObjectDataFor(any(String.class))).thenReturn(Mono.empty());
- Mono<PnfAaiObject> pnf = task.executePnfRetrieval("Empty PNF task", "some-url");
+ var pnf = task.executePnfRetrieval("Empty PNF task", "some-url");
verify(reactiveClient).getPnfObjectDataFor("some-url");
assertNull(pnf.block(Duration.ofSeconds(5)));
@@ -82,7 +82,7 @@
@Test
void passingEmptyServiceInstanceObject_NothingHappens() throws AaiTaskException {
when(reactiveClient.getServiceInstanceObjectDataFor(any(String.class))).thenReturn(Mono.empty());
- Mono<ServiceInstanceAaiObject> serviceInstance =
+ var serviceInstance =
task.executeServiceInstanceRetrieval("Empty Service Instance task", "some-url");
verify(reactiveClient).getServiceInstanceObjectDataFor("some-url");
@@ -92,8 +92,8 @@
@Test
void passingPnfObject_taskSucceeds() throws AaiTaskException {
- String pnfName = "pnf-1";
- String attachmentPoint = "olt1-1-1";
+ var pnfName = "pnf-1";
+ var attachmentPoint = "olt1-1-1";
// Build Relationship Data
RelationshipListAaiObject.RelationshipEntryAaiObject firstRelationshipEntry =
@@ -142,7 +142,7 @@
.build();
when(reactiveClient.getPnfObjectDataFor(any(String.class))).thenReturn(Mono.just(pnfAaiObject));
- Mono<PnfAaiObject> pnf = task.executePnfRetrieval("Normal PNF retrieval task", "some-url");
+ var pnf = task.executePnfRetrieval("Normal PNF retrieval task", "some-url");
verify(reactiveClient).getPnfObjectDataFor("some-url");
assertNotNull(pnf.block(Duration.ofSeconds(5)));
@@ -151,7 +151,7 @@
.expectSubscription()
.consumeNextWith(aPnf -> {
Assertions.assertEquals(pnfName, aPnf.getPnfName(), "PNF Name in response does not match");
- String extractedAttachmentPoint = aPnf.getRelationshipListAaiObject().getRelationshipEntries()
+ var extractedAttachmentPoint = aPnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
.filter(e -> e.getRelatedTo().equals("logical-link"))
.flatMap(e -> e.getRelationshipData().stream())
@@ -167,8 +167,8 @@
@Test
void passingServiceInstanceObject_taskSucceeds() throws AaiTaskException {
- String serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
- String orchestrationStatus = "active";
+ var serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
+ var orchestrationStatus = "active";
// Build Relationship Data
RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
@@ -207,7 +207,7 @@
when(reactiveClient.getServiceInstanceObjectDataFor(any(String.class)))
.thenReturn(Mono.just(serviceInstanceAaiObject));
- Mono<ServiceInstanceAaiObject> serviceInstance =
+ var serviceInstance =
task.executeServiceInstanceRetrieval("Normal Service Instance retrieval task",
"some-url");
@@ -220,10 +220,10 @@
Assertions.assertEquals(serviceInstanceId, instance.getServiceInstanceId(),
"Service Instance ID in response does not match");
- MetadataListAaiObject extractedMetadataListObject =
+ var extractedMetadataListObject =
instance.getMetadataListAaiObject().orElseThrow(AaiClientTaskTestException::new);
- MetadataListAaiObject.MetadataEntryAaiObject extractedMetadataEntry =
+ var extractedMetadataEntry =
extractedMetadataListObject.getMetadataEntries()
.stream()
.filter(m -> m.getMetaname().equals("cvlan"))
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java
index 40bcb65..f644d44 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java
@@ -20,38 +20,39 @@
package org.onap.bbs.event.processor.tasks;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
-
-import javax.net.ssl.SSLException;
import org.junit.Assert;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
+import org.onap.bbs.event.processor.config.DmaapCpeAuthenticationConsumerProperties;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
class DmaapCpeAuthenticationConsumerTaskImplTest {
+ private static final String DMAAP_PROTOCOL = "http";
+ private static final String DMAAP_HOST = "message-router.onap.svc.cluster.local";
+ private static final int DMAAP_PORT = 3904;
+ private static final String DMAAP_TOPIC = "unauthenticated.CPE_AUTHENTICATION";
+ private static final String SUBSCRIBER_ID = "subscriberID";
+ private static final String SUBSCRIBER_GROUP = "subscriberGroup";
+
private static final String CPE_AUTHENTICATION_EVENT_TEMPLATE = "{\"event\": {"
+ "\"commonEventHeader\": { \"sourceName\":\"%s\"},"
+ "\"stateChangeFields\": {"
@@ -65,32 +66,34 @@
private static DmaapCpeAuthenticationConsumerTask dmaapConsumerTask;
private static CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel;
- private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
private static String eventsArray;
+ private static MessageRouterSubscriber subscriber;
private static Gson gson = new Gson();
+ private static ApplicationConfiguration configuration;
@BeforeAll
- static void setUp() throws SSLException {
-
- final String sourceName = "PNF-CorrelationId";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
+ static void setUp() {
// Mock Re-registration configuration
- DmaapConsumerConfiguration dmaapConsumerConfiguration = testVersionOfDmaapConsumerConfiguration();
- ApplicationConfiguration configuration = mock(ApplicationConfiguration.class);
- when(configuration.getDmaapCpeAuthenticationConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
+ configuration = mock(ApplicationConfiguration.class);
+ var props = mock(DmaapCpeAuthenticationConsumerProperties.class);
+ when(props.getDmaapProtocol()).thenReturn(DMAAP_PROTOCOL);
+ when(props.getDmaapHostName()).thenReturn(DMAAP_HOST);
+ when(props.getDmaapPortNumber()).thenReturn(DMAAP_PORT);
+ when(props.getDmaapTopicName()).thenReturn(DMAAP_TOPIC);
+ when(props.getConsumerId()).thenReturn(SUBSCRIBER_ID);
+ when(props.getConsumerGroup()).thenReturn(SUBSCRIBER_GROUP);
+ when(configuration.getDmaapCpeAuthenticationConsumerProperties()).thenReturn(props);
- // Mock reactive DMaaP client
- ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
- dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
- doReturn(dMaaPConsumerReactiveHttpClient).when(httpClientFactory).create(dmaapConsumerConfiguration);
+ var subscriberConfig = mock(MessageRouterSubscriberConfig.class);
+ when(configuration.getDmaapCpeAuthenticationConsumerConfiguration()).thenReturn(subscriberConfig);
- dmaapConsumerTask = new DmaapCpeAuthenticationConsumerTaskImpl(configuration,
- new CpeAuthenticationDmaapConsumerJsonParser(), httpClientFactory);
+ var sourceName = "PNF-CorrelationId";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
cpeAuthenticationConsumerDmaapModel = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
.correlationId(sourceName)
@@ -101,58 +104,42 @@
.swVersion(swVersion)
.build();
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState,
newAuthenticationState, stateInterface, rgwMacAddress, swVersion);
eventsArray = "[" + event + "]";
}
- @AfterEach
- void resetMock() {
- reset(dMaaPConsumerReactiveHttpClient);
- }
-
@Test
void passingEmptyMessage_NothingHappens() throws Exception {
- JsonElement empty = gson.toJsonTree("");
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty));
+ var empty = gson.toJsonTree("");
+ subscriber = mock(MessageRouterSubscriber.class);
+ when(subscriber.getElements(any())).thenReturn(Flux.just(empty));
+
+ dmaapConsumerTask = new DmaapCpeAuthenticationConsumerTaskImpl(configuration, subscriber,
+ new CpeAuthenticationDmaapConsumerJsonParser());
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.expectError(EmptyDmaapResponseException.class);
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
+
+ verify(subscriber, times(1)).getElements(any());
+ verifyNoMoreInteractions(subscriber);
}
@Test
void passingNormalMessage_ResponseSucceeds() throws Exception {
- JsonElement normalEventsArray = gson.toJsonTree(eventsArray);
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
- .thenReturn(Mono.just(normalEventsArray));
+ var normalEventsArray = gson.toJsonTree(eventsArray);
+ subscriber = mock(MessageRouterSubscriber.class);
+ when(subscriber.getElements(any())).thenReturn(Flux.just(normalEventsArray));
+
+ dmaapConsumerTask = new DmaapCpeAuthenticationConsumerTaskImpl(configuration, subscriber,
+ new CpeAuthenticationDmaapConsumerJsonParser());
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.consumeNextWith(e -> Assert.assertEquals(e, cpeAuthenticationConsumerDmaapModel));
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
- }
-
- private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .consumerGroup("consumer-group")
- .consumerId("consumer-id")
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapPortNumber(3904)
- .dmaapProtocol("http")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .trustStorePath("change it")
- .trustStorePasswordPath("change_it")
- .keyStorePath("change it")
- .keyStorePasswordPath("change_it")
- .enableDmaapCertAuth(false)
- .dmaapTopicName("/events/unauthenticated.CPE_AUTHENTICATION")
- .timeoutMs(-1)
- .messageLimit(-1)
- .build();
+ verify(subscriber, times(1)).getElements(any());
+ verifyNoMoreInteractions(subscriber);
}
}
\ No newline at end of file
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java
index 436206d..7c15848 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java
@@ -21,7 +21,6 @@
package org.onap.bbs.event.processor.tasks;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -30,55 +29,52 @@
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Optional;
-
-import javax.net.ssl.SSLException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
+import org.onap.bbs.event.processor.config.DmaapProducerProperties;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-import org.springframework.http.HttpStatus;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
class DmaapPublisherTaskImplTest {
+ private static final String DMAAP_PROTOCOL = "http";
+ private static final String DMAAP_HOST = "message-router.onap.svc.cluster.local";
+ private static final int DMAAP_PORT = 3904;
+ private static final String DMAAP_TOPIC = "unauthenticated.DCAE_CL_OUTPUT";
+
private static ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel;
private static DmaapPublisherTaskImpl task;
- private static DMaaPPublisherReactiveHttpClient reactiveHttpClient;
private static ApplicationConfiguration configuration;
- private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
@BeforeAll
static void setUp() {
- dmaapPublisherConfiguration = testVersionOfDmaapPublisherConfiguration();
configuration = mock(ApplicationConfiguration.class);
- final String closedLoopEventClient = "DCAE.BBS_mSInstance";
- final String policyVersion = "1.0.0.5";
- final String policyName = "CPE_Authentication";
- final String policyScope =
+ final var closedLoopEventClient = "DCAE.BBS_mSInstance";
+ final var policyVersion = "1.0.0.5";
+ final var policyName = "CPE_Authentication";
+ final var policyScope =
"service=HSIAService,type=SampleType,"
+ "closedLoopControlName=CL-CPE_A-d925ed73-8231-4d02-9545-db4e101f88f8";
- final String targetType = "VM";
- final long closedLoopAlarmStart = 1484677482204798L;
- final String closedLoopEventStatus = "ONSET";
- final String closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b";
- final String version = "1.0.2";
- final String target = "vserver.vserver-name";
- final String requestId = "97964e10-686e-4790-8c45-bdfa61df770f";
- final String from = "DCAE";
+ final var targetType = "VM";
+ final var closedLoopAlarmStart = 1484677482204798L;
+ final var closedLoopEventStatus = "ONSET";
+ final var closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b";
+ final var version = "1.0.2";
+ final var target = "vserver.vserver-name";
+ final var requestId = "97964e10-686e-4790-8c45-bdfa61df770f";
+ final var from = "DCAE";
final Map<String, String> aaiEnrichmentData = new LinkedHashMap<>();
aaiEnrichmentData.put("service-information.service-instance-id", "service-instance-id-example");
@@ -100,77 +96,64 @@
.requestId(requestId)
.originator(from)
.build();
+ var props = mock(DmaapProducerProperties.class);
+ when(props.getDmaapProtocol()).thenReturn(DMAAP_PROTOCOL);
+ when(props.getDmaapHostName()).thenReturn(DMAAP_HOST);
+ when(props.getDmaapPortNumber()).thenReturn(DMAAP_PORT);
+ when(props.getDmaapTopicName()).thenReturn(DMAAP_TOPIC);
+ when(configuration.getDmaapProducerProperties()).thenReturn(props);
- when(configuration.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
+ var publisherConfig = mock(MessageRouterPublisherConfig.class);
+ when(configuration.getDmaapPublisherConfiguration()).thenReturn(publisherConfig);
+
+
}
@Test
void passingNullMessage_ExceptionIsRaised() {
- task = new DmaapPublisherTaskImpl(configuration);
-
Executable executableFunction = () -> task.execute(null);
Assertions.assertThrows(DmaapException.class, executableFunction, "Input message is invalid");
}
@Test
- void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException, SSLException {
- HttpResponse response = setupMocks(HttpStatus.OK.value());
+ void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException {
+ var publisher = mock(MessageRouterPublisher.class);
+ task = new DmaapPublisherTaskImpl(configuration, publisher);
- StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription()
- .expectNext(response).verifyComplete();
+ var response = mockResponse(true);
+ when(publisher.put(any(),any())).thenReturn(Flux.just(response));
- verify(reactiveHttpClient, times(1))
- .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
- verifyNoMoreInteractions(reactiveHttpClient);
+ StepVerifier.create(task.execute(controlLoopPublisherDmaapModel))
+ .expectSubscription()
+ .assertNext(r -> Assertions.assertTrue(r.successful()))
+ .verifyComplete();
+
+ verify(publisher, times(1)).put(any(),any());
+ verifyNoMoreInteractions(publisher);
}
@Test
- void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException, SSLException {
- HttpResponse response = setupMocks(HttpStatus.UNAUTHORIZED.value());
+ void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException {
+ var publisher = mock(MessageRouterPublisher.class);
+ task = new DmaapPublisherTaskImpl(configuration, publisher);
- StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription()
- .expectNext(response).verifyComplete();
+ var response = mockResponse(false);
+ when(publisher.put(any(),any())).thenReturn(Flux.just(response));
- verify(reactiveHttpClient, times(1))
- .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
- verifyNoMoreInteractions(reactiveHttpClient);
+ StepVerifier.create(task.execute(controlLoopPublisherDmaapModel))
+ .expectSubscription()
+ .assertNext(r -> Assertions.assertFalse(r.successful()))
+ .verifyComplete();
+
+ verify(publisher, times(1)).put(any(),any());
+ verifyNoMoreInteractions(publisher);
}
- // We can safely suppress unchecked assignment warning here since it is a mock class
- @SuppressWarnings("unchecked")
- private HttpResponse setupMocks(Integer httpResponseCode) throws SSLException {
-
- HttpResponse response = mock(HttpResponse.class);
- when(response.statusCode()).thenReturn(httpResponseCode);
-
- reactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class);
- when(reactiveHttpClient.getDMaaPProducerResponse(any(), any(Optional.class)))
- .thenReturn(Mono.just(response));
-
- PublisherReactiveHttpClientFactory httpClientFactory = mock(PublisherReactiveHttpClientFactory.class);
- doReturn(reactiveHttpClient).when(httpClientFactory).create(dmaapPublisherConfiguration);
-
- task = new DmaapPublisherTaskImpl(configuration, httpClientFactory);
-
+ private MessageRouterPublishResponse mockResponse(boolean isSuccess) {
+ var response = mock(MessageRouterPublishResponse.class);
+ when(response.successful()).thenReturn(isSuccess);
return response;
}
-
- private static DmaapPublisherConfiguration testVersionOfDmaapPublisherConfiguration() {
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapPortNumber(3904)
- .dmaapProtocol("http")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .trustStorePath("/opt/app/bbs/local/org.onap.bbs.trust.jks")
- .trustStorePasswordPath("change_it")
- .keyStorePath("/opt/app/bbs/local/org.onap.bbs.p12")
- .keyStorePasswordPath("change_it")
- .enableDmaapCertAuth(false)
- .dmaapTopicName("/events/unauthenticated.DCAE_CL_OUTPUT")
- .build();
- }
}
\ No newline at end of file
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java
index 72e2898..edbe511 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java
@@ -20,38 +20,39 @@
package org.onap.bbs.event.processor.tasks;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
-
-import javax.net.ssl.SSLException;
import org.junit.Assert;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
+import org.onap.bbs.event.processor.config.DmaapReRegistrationConsumerProperties;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
class DmaapReRegistrationConsumerTaskImplTest {
+ private static final String DMAAP_PROTOCOL = "http";
+ private static final String DMAAP_HOST = "message-router.onap.svc.cluster.local";
+ private static final int DMAAP_PORT = 3904;
+ private static final String DMAAP_TOPIC = "unauthenticated.PNF_REREGISTRATION";
+ private static final String SUBSCRIBER_ID = "subscriberID";
+ private static final String SUBSCRIBER_GROUP = "subscriberGroup";
+
private static final String RE_REGISTRATION_EVENT_TEMPLATE = "{\"event\": {"
+ "\"commonEventHeader\": { \"sourceName\":\"%s\"},"
+ "\"additionalFields\": {"
@@ -63,31 +64,33 @@
private static DmaapReRegistrationConsumerTaskImpl dmaapConsumerTask;
private static ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel;
- private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
private static String eventsArray;
+ private static MessageRouterSubscriber subscriber;
private static Gson gson = new Gson();
+ private static ApplicationConfiguration configuration;
@BeforeAll
- static void setUp() throws SSLException {
-
- final String sourceName = "PNF-CorrelationId";
- final String attachmentPoint = "olt2/2/2";
- final String remoteId = "remoteId";
- final String cvlan = "1005";
- final String svlan = "100";
+ static void setUp() {
// Mock Re-registration configuration
- DmaapConsumerConfiguration dmaapConsumerConfiguration = testVersionOfDmaapConsumerConfiguration();
- ApplicationConfiguration configuration = mock(ApplicationConfiguration.class);
- when(configuration.getDmaapReRegistrationConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
+ configuration = mock(ApplicationConfiguration.class);
+ var props = mock(DmaapReRegistrationConsumerProperties.class);
+ when(props.getDmaapProtocol()).thenReturn(DMAAP_PROTOCOL);
+ when(props.getDmaapHostName()).thenReturn(DMAAP_HOST);
+ when(props.getDmaapPortNumber()).thenReturn(DMAAP_PORT);
+ when(props.getDmaapTopicName()).thenReturn(DMAAP_TOPIC);
+ when(props.getConsumerId()).thenReturn(SUBSCRIBER_ID);
+ when(props.getConsumerGroup()).thenReturn(SUBSCRIBER_GROUP);
+ when(configuration.getDmaapReRegistrationConsumerProperties()).thenReturn(props);
- // Mock reactive DMaaP client
- ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
- dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
- doReturn(dMaaPConsumerReactiveHttpClient).when(httpClientFactory).create(dmaapConsumerConfiguration);
+ var subscriberConfig = mock(MessageRouterSubscriberConfig.class);
+ when(configuration.getDmaapReRegistrationConsumerConfiguration()).thenReturn(subscriberConfig);
- dmaapConsumerTask = new DmaapReRegistrationConsumerTaskImpl(configuration,
- new ReRegistrationDmaapConsumerJsonParser(), httpClientFactory);
+ var sourceName = "PNF-CorrelationId";
+ var attachmentPoint = "olt2/2/2";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
reRegistrationConsumerDmaapModel = ImmutableReRegistrationConsumerDmaapModel.builder()
.correlationId(sourceName)
@@ -97,58 +100,43 @@
.sVlan(svlan)
.build();
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, sourceName, attachmentPoint, remoteId,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, sourceName, attachmentPoint, remoteId,
cvlan, svlan);
eventsArray = "[" + event + "]";
}
- @AfterEach
- void resetMock() {
- reset(dMaaPConsumerReactiveHttpClient);
- }
-
@Test
void passingEmptyMessage_NothingHappens() {
- JsonElement empty = gson.toJsonTree("");
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty));
+ var empty = gson.toJsonTree("");
+ subscriber = mock(MessageRouterSubscriber.class);
+ when(subscriber.getElements(any())).thenReturn(Flux.just(empty));
+
+ dmaapConsumerTask = new DmaapReRegistrationConsumerTaskImpl(configuration, subscriber,
+ new ReRegistrationDmaapConsumerJsonParser());
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.expectError(EmptyDmaapResponseException.class);
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
+
+ verify(subscriber, times(1)).getElements(any());
+ verifyNoMoreInteractions(subscriber);
}
@Test
void passingNormalMessage_ResponseSucceeds() {
- JsonElement normalEventsArray = gson.toJsonTree(eventsArray);
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
- .thenReturn(Mono.just(normalEventsArray));
+ System.out.println("Events sent : " + eventsArray);
+ var normalEventsArray = gson.toJsonTree(eventsArray);
+ subscriber = mock(MessageRouterSubscriber.class);
+ when(subscriber.getElements(any())).thenReturn(Flux.just(normalEventsArray));
+
+ dmaapConsumerTask = new DmaapReRegistrationConsumerTaskImpl(configuration, subscriber,
+ new ReRegistrationDmaapConsumerJsonParser());
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.consumeNextWith(e -> Assert.assertEquals(e, reRegistrationConsumerDmaapModel));
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
- }
-
- private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .consumerGroup("OpenDCAE-c12")
- .consumerId("c12")
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapPortNumber(3904)
- .dmaapProtocol("http")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .trustStorePath("/opt/app/bbs/local/org.onap.bbs.trust.jks")
- .trustStorePasswordPath("change_it")
- .keyStorePath("/opt/app/bbs/local/org.onap.bbs.p12")
- .keyStorePasswordPath("change_it")
- .enableDmaapCertAuth(false)
- .dmaapTopicName("/events/unauthenticated.PNF_REREGISTRATION")
- .timeoutMs(-1)
- .messageLimit(-1)
- .build();
+ verify(subscriber, times(1)).getElements(any());
+ verifyNoMoreInteractions(subscriber);
}
}
\ No newline at end of file
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java
index 8e3c46b..2876dd9 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java
@@ -37,8 +37,6 @@
import java.util.HashMap;
import java.util.ServiceLoader;
-import javax.net.ssl.SSLException;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -74,13 +72,13 @@
private static WireMockServer wireMockServer;
@BeforeAll
- static void init() throws SSLException {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ static void init() {
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
gson = gsonBuilder.create();
- ApplicationConfiguration configuration = Mockito.mock(ApplicationConfiguration.class);
- AaiClientConfiguration aaiClientConfiguration = Mockito.mock(AaiClientConfiguration.class);
+ var configuration = Mockito.mock(ApplicationConfiguration.class);
+ var aaiClientConfiguration = Mockito.mock(AaiClientConfiguration.class);
when(configuration.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration);
when(aaiClientConfiguration.aaiUserName()).thenReturn("AAI");
when(aaiClientConfiguration.aaiUserPassword()).thenReturn("AAI");
@@ -106,10 +104,10 @@
@Test
void sendingReactiveRequestForPnf_Succeeds() {
- String pnfName = "pnf-1";
- String attachmentPoint = "olt1-1-1";
+ var pnfName = "pnf-1";
+ var attachmentPoint = "olt1-1-1";
- String pnfUrl = String.format("/aai/v14/network/pnfs/pnf/%s?depth=1", pnfName);
+ var pnfUrl = String.format("/aai/v14/network/pnfs/pnf/%s?depth=1", pnfName);
// Build Relationship Data
RelationshipListAaiObject.RelationshipEntryAaiObject firstRelationshipEntry =
@@ -166,7 +164,7 @@
.expectSubscription()
.consumeNextWith(pnf -> {
Assertions.assertEquals(pnfName, pnf.getPnfName(), "PNF Name in response does not match");
- String extractedAttachmentPoint = pnf.getRelationshipListAaiObject().getRelationshipEntries()
+ var extractedAttachmentPoint = pnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
.filter(e -> e.getRelatedTo().equals("logical-link"))
.flatMap(e -> e.getRelationshipData().stream())
@@ -182,10 +180,10 @@
@Test
void sendingReactiveRequestForServiceInstance_Succeeds() {
- String serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
- String orchestrationStatus = "active";
+ var serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
+ var orchestrationStatus = "active";
- String serviceInstanceUrl =
+ var serviceInstanceUrl =
String.format("/aai/v14/nodes/service-instances/service-instance/%s?format=resource_and_url",
serviceInstanceId);
@@ -237,10 +235,10 @@
Assertions.assertEquals(serviceInstanceId, serviceInstance.getServiceInstanceId(),
"Service Instance ID in response does not match");
- MetadataListAaiObject extractedMetadataListObject =
+ var extractedMetadataListObject =
serviceInstance.getMetadataListAaiObject().orElseThrow(AaiReactiveClientTestException::new);
- MetadataListAaiObject.MetadataEntryAaiObject extractedMetadataEntry =
+ var extractedMetadataEntry =
extractedMetadataListObject.getMetadataEntries()
.stream()
.filter(m -> m.getMetaname().equals("cvlan"))
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java
index c7ad793..d7970ae 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java
@@ -104,9 +104,8 @@
@Test
void passingNonJson_getIllegalStateException() {
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- new CpeAuthenticationDmaapConsumerJsonParser();
- JsonReader jsonReader = new JsonReader(new StringReader("not JSON"));
+ var consumerJsonParser = new CpeAuthenticationDmaapConsumerJsonParser();
+ var jsonReader = new JsonReader(new StringReader("not JSON"));
jsonReader.setLenient(true);
JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive();
@@ -118,8 +117,7 @@
@Test
void passingNoEvents_EmptyFluxIsReturned() {
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- new CpeAuthenticationDmaapConsumerJsonParser();
+ var consumerJsonParser = new CpeAuthenticationDmaapConsumerJsonParser();
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]"))))
.expectSubscription()
@@ -129,23 +127,22 @@
@Test
void passingOneCorrectEvent_validationSucceeds() {
- String sourceName = "PNF-CorrelationId";
- String oldAuthenticationState = "outOfService";
- String newAuthenticationState = "inService";
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var sourceName = "PNF-CorrelationId";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState,
newAuthenticationState, stateInterface, rgwMacAddress, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
CpeAuthenticationConsumerDmaapModel expectedEventObject = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
.correlationId(sourceName)
@@ -164,30 +161,29 @@
@Test
void passingTwoCorrectEvents_validationSucceeds() {
- String sourceName1 = "PNF-CorrelationId";
- String sourceName2 = "PNF-CorrelationId";
- String oldAuthenticationState = "outOfService";
- String newAuthenticationState = "inService";
- String stateInterface = "stateInterface";
- String rgwMacAddress1 = "00:0a:95:8d:78:16";
- String rgwMacAddress2 = "00:0a:95:8d:78:17";
- String swVersion = "1.2";
+ var sourceName1 = "PNF-CorrelationId";
+ var sourceName2 = "PNF-CorrelationId";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress1 = "00:0a:95:8d:78:16";
+ var rgwMacAddress2 = "00:0a:95:8d:78:17";
+ var swVersion = "1.2";
- String firstEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName1, oldAuthenticationState,
+ var firstEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName1, oldAuthenticationState,
newAuthenticationState, stateInterface, rgwMacAddress1, swVersion);
- String secondEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName2, oldAuthenticationState,
+ var secondEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName2, oldAuthenticationState,
newAuthenticationState, stateInterface, rgwMacAddress2, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement1 = jsonParser.parse(firstEvent);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement1 = jsonParser.parse(firstEvent);
Mockito.doReturn(Optional.of(jsonElement1.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement1);
- JsonElement jsonElement2 = jsonParser.parse(secondEvent);
+ var jsonElement2 = jsonParser.parse(secondEvent);
Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2);
- String eventsArray = "[" + firstEvent + "," + secondEvent + "]";
+ var eventsArray = "[" + firstEvent + "," + secondEvent + "]";
CpeAuthenticationConsumerDmaapModel expectedFirstEventObject =
ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -217,22 +213,21 @@
@Test
void passingJsonWithMissingAuthenticationState_validationFails() {
- String sourceName = "PNF-CorrelationId";
- String oldAuthenticationState = "outOfService";
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var sourceName = "PNF-CorrelationId";
+ var oldAuthenticationState = "outOfService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_AUTHENTICATION_STATE, sourceName,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_AUTHENTICATION_STATE, sourceName,
oldAuthenticationState, stateInterface, rgwMacAddress, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -242,22 +237,21 @@
@Test
void passingJsonWithMissingSourceName_validationFails() {
- String oldAuthenticationState = "outOfService";
- String newAuthenticationState = "inService";
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME,
oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -267,22 +261,21 @@
@Test
void passingJsonWithMissingSourceNameValue_validationFails() {
- String oldAuthenticationState = "outOfService";
- String newAuthenticationState = "inService";
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME_VALUE,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME_VALUE,
oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -292,22 +285,21 @@
@Test
void passingJsonWithMissingStateChangeFieldsHeader_validationFails() {
- String oldAuthenticationState = "outOfService";
- String newAuthenticationState = "inService";
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_STATE_CHANGE_FIELDS,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_STATE_CHANGE_FIELDS,
oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java
index cd238e2..6d78826 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java
@@ -93,8 +93,8 @@
@Test
void passingNonJson_getIllegalStateException() {
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
- JsonReader jsonReader = new JsonReader(new StringReader("not JSON"));
+ var consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
+ var jsonReader = new JsonReader(new StringReader("not JSON"));
jsonReader.setLenient(true);
JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive();
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson)))
@@ -105,7 +105,7 @@
@Test
void passingNoEvents_EmptyFluxIsReturned() {
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
+ var consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]"))))
.expectSubscription()
.verifyComplete();
@@ -114,21 +114,21 @@
@Test
void passingOneCorrectEvent_validationSucceeds() {
- String correlationId = "PNF-CorrelationId";
- String attachmentPoint = "olt1/1/1";
- String remoteId = "remoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var correlationId = "PNF-CorrelationId";
+ var attachmentPoint = "olt1/1/1";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId, attachmentPoint,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId, attachmentPoint,
remoteId, cvlan, svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
ReRegistrationConsumerDmaapModel expectedEventObject = ImmutableReRegistrationConsumerDmaapModel.builder()
.correlationId(correlationId)
@@ -146,29 +146,29 @@
@Test
void passingTwoCorrectEvents_validationSucceeds() {
- String correlationId1 = "PNF-CorrelationId1";
- String correlationId2 = "PNF-CorrelationId2";
- String attachmentPoint1 = "olt1/1/1";
- String attachmentPoint2 = "olt2/2/2";
- String remoteId1 = "remoteId1";
- String remoteId2 = "remoteId2";
- String cvlan = "1005";
- String svlan = "100";
+ var correlationId1 = "PNF-CorrelationId1";
+ var correlationId2 = "PNF-CorrelationId2";
+ var attachmentPoint1 = "olt1/1/1";
+ var attachmentPoint2 = "olt2/2/2";
+ var remoteId1 = "remoteId1";
+ var remoteId2 = "remoteId2";
+ var cvlan = "1005";
+ var svlan = "100";
- String firstEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1,
+ var firstEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1,
remoteId1, cvlan, svlan);
- String secondEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1,
+ var secondEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1,
remoteId1, cvlan, svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement1 = jsonParser.parse(firstEvent);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement1 = jsonParser.parse(firstEvent);
Mockito.doReturn(Optional.of(jsonElement1.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement1);
- JsonElement jsonElement2 = jsonParser.parse(secondEvent);
+ var jsonElement2 = jsonParser.parse(secondEvent);
Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2);
- String eventsArray = "[" + firstEvent + "," + secondEvent + "]";
+ var eventsArray = "[" + firstEvent + "," + secondEvent + "]";
ReRegistrationConsumerDmaapModel expectedFirstEventObject = ImmutableReRegistrationConsumerDmaapModel.builder()
.correlationId(correlationId1)
@@ -194,23 +194,23 @@
@Test
void passingJsonWithMissingAttachmentPoint_validationFails() {
- String correlationId = "PNF-CorrelationId";
- String remoteId = "remoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var correlationId = "PNF-CorrelationId";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ATTACHMENT_POINT,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ATTACHMENT_POINT,
correlationId,
remoteId,
cvlan,
svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -220,23 +220,23 @@
@Test
void passingJsonWithMissingCorrelationId_validationFails() {
- String attachmentPoint = "olt1/1/1";
- String remoteId = "remoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var attachmentPoint = "olt1/1/1";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID,
attachmentPoint,
remoteId,
cvlan,
svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -246,23 +246,23 @@
@Test
void passingJsonWithMissingCorrelationIdValue_validationFails() {
- String attachmentPoint = "olt1/1/1";
- String remoteId = "remoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var attachmentPoint = "olt1/1/1";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID_VALUE,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID_VALUE,
attachmentPoint,
remoteId,
cvlan,
svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -272,25 +272,25 @@
@Test
void passingJsonWithMissingAdditionalFields_validationFails() {
- String correlationId = "PNF-CorrelationId";
- String attachmentPoint = "olt1/1/1";
- String remoteId = "remoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var correlationId = "PNF-CorrelationId";
+ var attachmentPoint = "olt1/1/1";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ADDITIONAL_FIELDS,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ADDITIONAL_FIELDS,
correlationId,
attachmentPoint,
remoteId,
cvlan,
svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
diff --git a/components/bbs-event-processor/src/test/resources/logback-test.xml b/components/bbs-event-processor/src/test/resources/logback-test.xml
index 0b93a43..dd7dcb7 100644
--- a/components/bbs-event-processor/src/test/resources/logback-test.xml
+++ b/components/bbs-event-processor/src/test/resources/logback-test.xml
@@ -20,5 +20,5 @@
<include resource="org/springframework/boot/logging/logback/base.xml" />
<root level="ERROR"/>
<logger name="org.springframework" level="ERROR"/>
- <logger name="org.onap" level="WARN"/>
+ <logger name="org.onap.bbs" level="WARN"/>
</configuration>
\ No newline at end of file
diff --git a/components/bbs-event-processor/version.properties b/components/bbs-event-processor/version.properties
index 7b8b963..358e99c 100644
--- a/components/bbs-event-processor/version.properties
+++ b/components/bbs-event-processor/version.properties
@@ -1,5 +1,5 @@
-major=1
-minor=1
+major=2
+minor=0
patch=0
base_version=${major}.${minor}.${patch}
release_version=${base_version}