Cluster distributed data store
Add experimental cluster co-ordination service using Atomic framework.
Included distributed data store creation utilities.
Sample docker compose data cluster between cds controller and resource-resolution instances.
Issue-ID: CCSDK-2000
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I4de00e773a996e08fd1d260fc27ed18832433883
diff --git a/ms/blueprintsprocessor/application/pom.xml b/ms/blueprintsprocessor/application/pom.xml
index f4d784b..a566c84 100755
--- a/ms/blueprintsprocessor/application/pom.xml
+++ b/ms/blueprintsprocessor/application/pom.xml
@@ -95,6 +95,12 @@
<version>0.7.0-SNAPSHOT</version>
</dependency>
+ <!-- Libs -->
+ <dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>atomix-lib</artifactId>
+ </dependency>
+
<!-- Functions -->
<dependency>
<groupId>org.onap.ccsdk.cds.blueprintsprocessor.functions</groupId>
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
new file mode 100644
index 0000000..f4b4b79
--- /dev/null
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
@@ -0,0 +1,85 @@
+version: '3.7'
+
+services:
+ db:
+ image: mariadb:latest
+ container_name: ccsdk-mariadb
+ networks:
+ - cds-network
+ ports:
+ - "3306:3306"
+ volumes:
+ - ~/vm_mysql:/var/lib/mysql
+ restart: always
+ environment:
+ MYSQL_ROOT_PASSWORD: sdnctl
+ MYSQL_DATABASE: sdnctl
+ MYSQL_USER: sdnctl
+ MYSQL_PASSWORD: sdnctl
+ cds-controller-1:
+ depends_on:
+ - db
+ image: onap/ccsdk-blueprintsprocessor:latest
+ container_name: cds-controller-1
+ hostname: cds-controller-1
+ networks:
+ - cds-network
+ ports:
+ - "8000:8080"
+ - "9111:9111"
+ restart: always
+ volumes:
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
+ - target: /opt/app/onap/config
+ type: bind
+ source: ./config
+ environment:
+ # Same as hostname and container name
+ CLUSTER_ID: cds-cluster
+ CLUSTER_NODE_ID: cds-controller-1
+ CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
+ CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
+ #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
+ APPLICATIONNAME: cds-controller
+ BUNDLEVERSION: 1.0.0
+ APP_CONFIG_HOME: /opt/app/onap/config
+ STICKYSELECTORKEY:
+ ENVCONTEXT: dev
+ resource-resolution-1:
+ depends_on:
+ - db
+ image: onap/ccsdk-blueprintsprocessor:latest
+ container_name: resource-resolution-1
+ hostname: resource-resolution-1
+ networks:
+ - cds-network
+ ports:
+ - "8001:8080"
+ - "9112:9111"
+ restart: always
+ volumes:
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
+ - target: /opt/app/onap/config
+ type: bind
+ source: ./config
+ environment:
+ CLUSTER_ID: cds-cluster
+ CLUSTER_NODE_ID: resource-resolution-1
+ CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
+ CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
+ #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
+ APPLICATIONNAME: resource-resolution
+ BUNDLEVERSION: 1.0.0
+ APP_CONFIG_HOME: /opt/app/onap/config
+ STICKYSELECTORKEY:
+ ENVCONTEXT: dev
+volumes:
+ blueprints-deploy:
+
+networks:
+ cds-network:
+ driver: bridge
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
index 0ff04bf..d877702 100755
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
@@ -1,9 +1,11 @@
-version: '3.3'
+version: '3.7'
services:
db:
image: mariadb:latest
container_name: ccsdk-mariadb
+ networks:
+ - cds-network
ports:
- "3306:3306"
volumes:
@@ -20,6 +22,8 @@
image: onap/ccsdk-blueprintsprocessor:latest
container_name: cds-controller-default
hostname: cds-controller-default
+ networks:
+ - cds-network
ports:
- "8000:8080"
- "9111:9111"
@@ -37,6 +41,8 @@
- db
image: onap/ccsdk-commandexecutor:latest
container_name: bp-command-executor
+ networks:
+ - cds-network
ports:
- "50051:50051"
restart: always
@@ -48,6 +54,8 @@
image: onap/ccsdk-py-executor
container_name: py-executor-default
hostname: py-executor-default
+ networks:
+ - cds-network
ports:
- "50052:50052"
restart: always
@@ -65,3 +73,7 @@
volumes:
blueprints-deploy:
+
+networks:
+ cds-network:
+ driver: bridge
diff --git a/ms/blueprintsprocessor/application/src/main/docker/startService.sh b/ms/blueprintsprocessor/application/src/main/docker/startService.sh
index 7dcb5ff..f5967dc 100644
--- a/ms/blueprintsprocessor/application/src/main/docker/startService.sh
+++ b/ms/blueprintsprocessor/application/src/main/docker/startService.sh
@@ -2,7 +2,7 @@
nodeName=BlueprintsProcessor_1.0.0_$(cat /proc/self/cgroup | grep docker | sed s/\\//\\n/g | tail -1)
-echo "APP Config HOME : ${APP_CONFIG_HOME}"
+echo "${CLUSTER_ID}:${CLUSTER_NODE_ID} APP Config HOME : ${APP_CONFIG_HOME}"
export APP_HOME=/opt/app/onap
keytool -import -noprompt -trustcacerts -keystore $JAVA_HOME/jre/lib/security/cacerts -storepass changeit -alias ONAP -import -file $APP_CONFIG_HOME/ONAP_RootCA.cer
@@ -18,4 +18,10 @@
-Djava.security.egd=file:/dev/./urandom \
-DAPPNAME=${APPLICATIONNAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \
-Dspring.config.location=${APP_CONFIG_HOME}/ \
+-DCLUSTER_ID=${CLUSTER_ID} \
+-DCLUSTER_NODE_ID=${CLUSTER_NODE_ID} \
+-DCLUSTER_NODE_ADDRESS=${CLUSTER_NODE_ID} \
+-DCLUSTER_MEMBERS=${CLUSTER_MEMBERS} \
+-DCLUSTER_STORAGE_PATH=${CLUSTER_STORAGE_PATH} \
+-DCLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} \
org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt
diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt
new file mode 100644
index 0000000..b78ebf6
--- /dev/null
+++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor
+
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.boot.context.event.ApplicationReadyEvent
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Component
+import java.time.Duration
+import javax.annotation.PreDestroy
+
+/**
+ * To Start the cluster, minimum 2 Instances/ Replicas od CDS needed.
+ * All instance such as Blueprintprocessor, ResourceResolution, MessagePrioritization should be in
+ * same cluster and should have same cluster name.
+ *
+ * Data can be shared only between the clusters, outside the cluster data can't be shared.
+ * If cds-controller-x instance wants to share data with resource-resolution-x instance, then it should be in the
+ * same cluster.(cds-cluster) and same network (cds-network)
+ *
+ * Assumptions:
+ * 1. Container, Pod and Host names are same.
+ * 2. Container names should end with sequence number.
+ * Blueprintprocessor example be : cds-controller-1, cds-controller-2, cds-controller-3
+ * ResourceResolution example be : resource-resolution-1, resource-resolution-2, resource-resolution-3
+ * 3. Each contained, should have environment properties CLUSTER_ID, CLUSTER_NODE_ID, CLUSTER_NODE_ADDRESS,
+ * CLUSTER_MEMBERS, CLUSTER_STORAGE_PATH
+ * Example values :
+ * CLUSTER_ID: cds-cluster
+ * CLUSTER_NODE_ID: cds-controller-2
+ * CLUSTER_NODE_ADDRESS: cds-controller-2
+ * CLUSTER_MEMBERS: cds-controller-1,cds-controller-2,cds-controller-3,resource-resolution-1,resource-resolution-2,resource-resolution-3
+ * CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
+ * CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
+ * 4. Cluster will be enabled only all the above properties present in the environments.
+ * if CLUSTER_ID is present, then it will try to create cluster.
+ */
+@Component
+open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePrintClusterService) {
+
+ private val log = logger(BluePrintProcessorCluster::class)
+
+ @EventListener(ApplicationReadyEvent::class)
+ fun startAndJoinCluster() = runBlocking {
+ val clusterId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID)
+
+ if (!clusterId.isNullOrEmpty()) {
+
+ val nodeId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
+ ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ID}")
+
+ val nodeAddress = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS)
+ ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS}")
+
+ val clusterMembers = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_MEMBERS)
+ ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_MEMBERS}")
+
+ val clusterMemberList = clusterMembers.split(",").map { it.trim() }.toList()
+
+ val clusterStorage = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH)
+ ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH}")
+
+ val clusterConfigFile = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_CONFIG_FILE)
+
+ val clusterInfo = ClusterInfo(
+ id = clusterId, nodeId = nodeId,
+ clusterMembers = clusterMemberList, nodeAddress = nodeAddress,
+ storagePath = clusterStorage,
+ configFile = clusterConfigFile
+ )
+ bluePrintClusterService.startCluster(clusterInfo)
+ } else {
+ log.info(
+ "Cluster is disabled, to enable cluster set the environment " +
+ "properties[CLUSTER_ID,CLUSTER_NODE_ID, CLUSTER_NODE_ADDRESS, CLUSTER_MEMBERS,CLUSTER_CONFIG_FILE]"
+ )
+ }
+ }
+
+ @PreDestroy
+ fun shutDown() = runBlocking {
+ bluePrintClusterService.shutDown(Duration.ofSeconds(1))
+ }
+}
diff --git a/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-bootstrap.conf b/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-bootstrap.conf
new file mode 100644
index 0000000..0fc31e0
--- /dev/null
+++ b/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-bootstrap.conf
@@ -0,0 +1,35 @@
+cluster {
+ # Configure the cluster node information.
+ node {
+ id: ${CLUSTER_NODE_ID}
+ address: ${CLUSTER_NODE_ADDRESS}
+ }
+ # Configure the node discovery protocol.
+ discovery {
+ type: bootstrap
+ nodes.1 {
+ id: cds-controller-1
+ address: "cds-controller-1:5679"
+ }
+ nodes.2 {
+ id: resource-reolution-1
+ address: "resource-reolution-1:5679"
+ }
+ }
+}
+# Configure the system management group.
+managementGroup {
+ type: raft
+ name: system
+ partitions: 1
+ members: [${CLUSTER_MEMBERS}]
+ storage {
+ directory: ${CLUSTER_STORAGE_PATH}/data-${CLUSTER_NODE_ID}
+ level: DISK
+ }
+}
+# Configure a Raft partition group.
+partitionGroups.data {
+ type: primary-backup
+ partitions: 7
+}
diff --git a/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-multicast.conf b/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-multicast.conf
new file mode 100644
index 0000000..fd16187
--- /dev/null
+++ b/ms/blueprintsprocessor/application/src/main/resources/atomix/atomix-multicast.conf
@@ -0,0 +1,40 @@
+cluster {
+ # Configure the cluster node information.
+ node {
+ id: ${CLUSTER_NODE_ID}
+ address: ${CLUSTER_NODE_ADDRESS}
+ }
+ # Configure the node discovery protocol.
+ discovery {
+ type: multicast
+ }
+ multicast: {
+ enabled: true
+ port: 54321
+ }
+ # Configure the SWIM membership protocol.
+ protocol {
+ type: swim
+ broadcastUpdates: true
+ gossipInterval: 500ms
+ probeInterval: 2s
+ suspectProbes: 2
+ }
+}
+# Configure the system management group.
+managementGroup {
+ type: raft
+ name: system
+ partitions: 1
+ members: [${CLUSTER_MEMBERS}]
+ storage {
+ directory: ${CLUSTER_STORAGE_PATH}/data-${CLUSTER_NODE_ID}
+ level: DISK
+ }
+}
+
+# Configure a Raft partition group.
+partitionGroups.data {
+ type: primary-backup
+ partitions: 7
+}
diff --git a/ms/blueprintsprocessor/application/src/main/resources/logback.xml b/ms/blueprintsprocessor/application/src/main/resources/logback.xml
index e1389a6..d58be8a 100644
--- a/ms/blueprintsprocessor/application/src/main/resources/logback.xml
+++ b/ms/blueprintsprocessor/application/src/main/resources/logback.xml
@@ -39,6 +39,7 @@
<logger name="org.springframework" level="info"/>
<logger name="org.springframework.web" level="info"/>
<logger name="org.hibernate" level="error"/>
+ <logger name="io.atomix" level="warn"/>
<logger name="org.onap.ccsdk.cds" level="info"/>
<root level="info">
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
index fcc921c..caf0631 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
@@ -217,4 +217,12 @@
const val MODEL_TYPE_ARTIFACT_COMPONENT_JAR = "artifact-component-jar"
val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean()
+
+ /** Cluster Properties */
+ const val PROPERTY_CLUSTER_ID = "CLUSTER_ID"
+ const val PROPERTY_CLUSTER_NODE_ID = "CLUSTER_NODE_ID"
+ const val PROPERTY_CLUSTER_NODE_ADDRESS = "CLUSTER_NODE_ADDRESS"
+ const val PROPERTY_CLUSTER_MEMBERS = "CLUSTER_MEMBERS"
+ const val PROPERTY_CLUSTER_STORAGE_PATH = "CLUSTER_STORAGE_PATH"
+ const val PROPERTY_CLUSTER_CONFIG_FILE = "CLUSTER_CONFIG_FILE"
}
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
new file mode 100644
index 0000000..d3d6210
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.controllerblueprints.core.utils
+
+import java.net.InetAddress
+
+object ClusterUtils {
+
+ /** get the local host name */
+ fun hostname(): String {
+ val ip = InetAddress.getLocalHost()
+ return ip.hostName
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml
new file mode 100644
index 0000000..7fa7b45
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright © 2018-2019 AT&T Intellectual Property.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>commons</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>atomix-lib</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Blueprints Processor Atomix Lib</name>
+ <description>Blueprints Processor Atomix Lib</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix-raft</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix-primary-backup</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix-gossip</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>db-lib</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt
new file mode 100644
index 0000000..8ea1593
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import org.springframework.context.annotation.Configuration
+
+@Configuration
+open class BluePrintAtomixLibConfiguration
+
+/**
+ * Exposed Dependency Service by this Atomix Lib Module
+ */
+fun BluePrintDependencyService.clusterService(): BluePrintClusterService =
+ instance(AtomixBluePrintClusterService::class)
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
new file mode 100644
index 0000000..696d728
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
@@ -0,0 +1,25 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+
+import io.atomix.core.map.DistributedMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+
+fun <T : Map<*, *>> T.toDistributedMap(): DistributedMap<*, *> {
+ return if (this != null && this is DistributedMap<*, *>) this
+ else throw BluePrintProcessorException("map is not of type DistributedMap")
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
new file mode 100644
index 0000000..27921be
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
@@ -0,0 +1,118 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
+
+import io.atomix.cluster.ClusterMembershipEvent
+import io.atomix.core.Atomix
+import kotlinx.coroutines.delay
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.stereotype.Service
+import java.time.Duration
+import java.util.concurrent.CompletableFuture
+
+@Service
+open class AtomixBluePrintClusterService : BluePrintClusterService {
+
+ private val log = logger(AtomixBluePrintClusterService::class)
+
+ lateinit var atomix: Atomix
+
+ private var joined = false
+
+ override suspend fun startCluster(clusterInfo: ClusterInfo) {
+ log.info(
+ "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
+ "starting with members(${clusterInfo.clusterMembers})"
+ )
+
+ /** Create Atomix cluster either from config file or default multi-cast cluster*/
+ atomix = if (!clusterInfo.configFile.isNullOrEmpty()) {
+ AtomixLibUtils.configAtomix(clusterInfo.configFile!!)
+ } else {
+ AtomixLibUtils.defaultMulticastAtomix(clusterInfo)
+ }
+
+ /** Listen for the member chaneg events */
+ atomix.membershipService.addListener { membershipEvent ->
+ when (membershipEvent.type()) {
+ ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added")
+ ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed")
+ ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed")
+ else -> log.info("***** Member event unknown")
+ }
+ }
+ atomix.start().join()
+ log.info(
+ "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
+ "created successfully...."
+ )
+
+ /** Receive ping from network */
+ val pingHandler = { message: String ->
+ log.info("####### ping message received : $message")
+ CompletableFuture.completedFuture(message)
+ }
+ atomix.communicationService.subscribe("ping", pingHandler)
+
+ /** Ping the network */
+ atomix.communicationService.broadcast(
+ "ping",
+ "ping from node(${clusterInfo.nodeId})"
+ )
+ joined = true
+ }
+
+ override fun clusterJoined(): Boolean {
+ return joined
+ }
+
+ override suspend fun allMembers(): Set<ClusterMember> {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
+ check(atomix.isRunning) { "cluster is not running" }
+ return atomix.membershipService.members.map {
+ ClusterMember(
+ id = it.id().id(),
+ memberAddress = it.host()
+ )
+ }.toSet()
+ }
+
+ override suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
+ check(atomix.isRunning) { "cluster is not running" }
+
+ return atomix.membershipService.members.filter {
+ it.id().id().startsWith(memberPrefix, true)
+ }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) }
+ .toSet()
+ }
+
+ override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
+ return AtomixLibUtils.distributedMapStore<T>(atomix, name)
+ }
+
+ override suspend fun shutDown(duration: Duration) {
+ val shutDownMilli = duration.toMillis()
+ log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
+ delay(shutDownMilli)
+ atomix.stop()
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
new file mode 100644
index 0000000..6e726a1
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.ArrayNode
+import com.fasterxml.jackson.databind.node.MissingNode
+import com.fasterxml.jackson.databind.node.NullNode
+import com.fasterxml.jackson.databind.node.ObjectNode
+import io.atomix.core.Atomix
+import io.atomix.core.map.DistributedMap
+import io.atomix.protocols.backup.MultiPrimaryProtocol
+import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup
+import io.atomix.protocols.raft.partition.RaftPartitionGroup
+import io.atomix.utils.net.Address
+import org.jsoup.nodes.TextNode
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
+
+object AtomixLibUtils {
+
+ fun configAtomix(filePath: String): Atomix {
+ val configFile = normalizedFile(filePath)
+ return Atomix.builder(configFile.absolutePath).build()
+ }
+
+ fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix {
+
+ val nodeId = clusterInfo.nodeId
+
+ val raftPartitionGroup = RaftPartitionGroup.builder("system")
+ .withNumPartitions(7)
+ .withMembers(clusterInfo.clusterMembers)
+ .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId"))
+ .build()
+
+ val primaryBackupGroup =
+ PrimaryBackupPartitionGroup.builder("data")
+ .withNumPartitions(31)
+ .build()
+
+ return Atomix.builder()
+ .withMemberId(nodeId)
+ .withAddress(Address.from(clusterInfo.nodeAddress))
+ .withManagementGroup(raftPartitionGroup)
+ .withPartitionGroups(primaryBackupGroup)
+ .withMulticastEnabled()
+ .build()
+ }
+
+ fun <T> distributedMapStore(atomix: Atomix, storeName: String): DistributedMap<String, T> {
+ check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" }
+
+ val protocol = MultiPrimaryProtocol.builder()
+ .withBackups(2)
+ .build()
+
+ return atomix.mapBuilder<String, T>(storeName)
+ .withProtocol(protocol)
+ .withCacheEnabled()
+ .withValueType(JsonNode::class.java)
+ .withExtraTypes(
+ JsonNode::class.java, TextNode::class.java, ObjectNode::class.java,
+ ArrayNode::class.java, NullNode::class.java, MissingNode::class.java
+ )
+ .build()
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
new file mode 100644
index 0000000..919d671
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
@@ -0,0 +1,80 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+
+import com.fasterxml.jackson.databind.JsonNode
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.async
+import kotlinx.coroutines.awaitAll
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import org.junit.Before
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
+import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import kotlin.test.assertNotNull
+
+class AtomixBluePrintClusterServiceTest {
+ val log = logger(AtomixBluePrintClusterServiceTest::class)
+
+ @Before
+ fun init() {
+ runBlocking {
+ deleteNBDir("target/cluster")
+ }
+ }
+
+ /** Testing two cluster with distributed map store creation, This is time consuming test casetake around 10s **/
+ @Test
+ fun testClusterJoin() {
+ runBlocking {
+ val members = arrayListOf("node-5679", "node-5680")
+ val deferred = arrayListOf(5679, 5680).map { port ->
+ async(Dispatchers.IO) {
+ val nodeId = "node-$port"
+ log.info("********** Starting node($nodeId) on port($port)")
+ val clusterInfo = ClusterInfo(
+ id = "test-cluster", nodeId = nodeId,
+ clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster"
+ )
+ val atomixClusterService = AtomixBluePrintClusterService()
+ atomixClusterService.startCluster(clusterInfo)
+ atomixClusterService.atomix
+ }
+ }
+ val atomix = deferred.awaitAll()
+ /** Test Distributed store creation */
+ repeat(2) { storeId ->
+ val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
+ assertNotNull(store, "failed to get store")
+ val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
+ store1.addListener {
+ log.info("Received map event : $it")
+ }
+ repeat(10) {
+ store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
+ }
+ delay(100)
+ store.close()
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..f1c625e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
@@ -0,0 +1,36 @@
+<!--
+ ~ Copyright © 2017-2018 AT&T Intellectual Property.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.springframework.test" level="warn"/>
+ <logger name="org.springframework" level="warn"/>
+ <logger name="org.hibernate" level="info"/>
+ <logger name="io.atomix" level="warn"/>
+ <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
diff --git a/ms/blueprintsprocessor/modules/commons/pom.xml b/ms/blueprintsprocessor/modules/commons/pom.xml
index 30c34ab..78c5691 100755
--- a/ms/blueprintsprocessor/modules/commons/pom.xml
+++ b/ms/blueprintsprocessor/modules/commons/pom.xml
@@ -34,6 +34,7 @@
<modules>
<module>processor-core</module>
+ <module>atomix-lib</module>
<module>db-lib</module>
<module>rest-lib</module>
<module>dmaap-lib</module>
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
new file mode 100644
index 0000000..bbaa427
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.service
+
+import java.time.Duration
+
+interface BluePrintClusterService {
+
+ /** Start the cluster with [clusterInfo], By default clustering service is disabled.
+ * Application module has to start cluster */
+ suspend fun startCluster(clusterInfo: ClusterInfo)
+
+ fun clusterJoined(): Boolean
+
+ /** Returns all the data cluster members */
+ suspend fun allMembers(): Set<ClusterMember>
+
+ /** Returns data cluster members starting with prefix */
+ suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember>
+
+ /** Create and get or get the distributed data map store with [name] */
+ suspend fun <T> clusterMapStore(name: String): MutableMap<String, T>
+
+ /** Shut down the cluster with [duration] */
+ suspend fun shutDown(duration: Duration)
+}
+
+data class ClusterInfo(
+ val id: String,
+ var configFile: String? = null,
+ val nodeId: String,
+ val nodeAddress: String,
+ var clusterMembers: List<String>,
+ var storagePath: String
+)
+
+data class ClusterMember(val id: String, val memberAddress: String?)
diff --git a/ms/blueprintsprocessor/parent/pom.xml b/ms/blueprintsprocessor/parent/pom.xml
index b806397..091d9dc 100755
--- a/ms/blueprintsprocessor/parent/pom.xml
+++ b/ms/blueprintsprocessor/parent/pom.xml
@@ -25,6 +25,7 @@
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Blueprints Processor Parent</name>
@@ -296,6 +297,28 @@
<version>${netty-ssl}</version>
</dependency>
+ <!-- Atomix -->
+ <dependency>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix</artifactId>
+ <version>${atomix.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix-raft</artifactId>
+ <version>${atomix.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix-primary-backup</artifactId>
+ <version>${atomix.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix-gossip</artifactId>
+ <version>${atomix.version}</version>
+ </dependency>
+
<!-- Adaptors -->
<dependency>
<groupId>org.apache.sshd</groupId>
@@ -378,6 +401,11 @@
</dependency>
<dependency>
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>atomix-lib</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>db-lib</artifactId>
<version>${project.version}</version>
</dependency>