Merge "Provision MM at target MR"
diff --git a/pom.xml b/pom.xml
index 90cc026..6517cd1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -387,7 +387,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jettyVersion>9.4.12.RC2</jettyVersion>
<eelf.version>1.0.0</eelf.version>
- <artifact.version>1.0.25-SNAPSHOT</artifact.version>
+ <artifact.version>1.0.26-SNAPSHOT</artifact.version>
<!-- SONAR -->
<jacoco.version>0.7.7.201606060606</jacoco.version>
<sonar-jacoco-listeners.version>3.2</sonar-jacoco-listeners.version>
diff --git a/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java b/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java
index a92dbc7..28a9add 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java
@@ -31,8 +31,10 @@
import java.net.URL;
import java.net.HttpURLConnection;
+import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
import org.apache.commons.codec.binary.Base64;
import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
@@ -49,13 +51,14 @@
private String mmProvCred;
private String unit_test;
private boolean useAAF;
-
+ private boolean hostnameVerify;
public MrTopicConnection(String user, String pwd ) {
mmProvCred = new String( user + ":" + pwd );
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
unit_test = p.getProperty( "UnitTest", "No" );
useAAF= "true".equalsIgnoreCase(p.getProperty("UseAAF", "false"));
+ hostnameVerify= "true".equalsIgnoreCase(p.getProperty("MR.hostnameVerify", "true"));
}
public boolean makeTopicConnection( MR_Cluster cluster, String topic, String overrideFqdn ) {
@@ -71,13 +74,28 @@
return makeConnection( topicURL );
}
+
private boolean makeSecureConnection( String pURL ) {
logger.info( "makeConnection to " + pURL );
-
+
try {
+ HostnameVerifier hostnameVerifier = new HostnameVerifier() {
+ @Override
+ public boolean verify( String hostname, SSLSession session ) {
+ return true;
+ }
+
+ };
+
+
URL u = new URL( pURL );
- uc = (HttpsURLConnection) u.openConnection();
+ uc = (HttpsURLConnection) u.openConnection();
uc.setInstanceFollowRedirects(false);
+ if ( ! hostnameVerify ) {
+ HttpsURLConnection ucs = (HttpsURLConnection) uc;
+ ucs.setHostnameVerifier(hostnameVerifier);
+ }
+
logger.info( "open connection to " + pURL );
return(true);
} catch (Exception e) {
diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
index da9d822..5d695f4 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
@@ -57,6 +57,7 @@
private static String defaultConsumerPort;
private static String centralFqdn;
private int maxTopicsPerMM;
+ private boolean mmPerMR;
public MirrorMakerService() {
super();
@@ -68,6 +69,7 @@
defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");
centralFqdn = p.getProperty("MR.CentralCname", "notSet");
maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
+ mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
}
// will create a MM on MMagent if needed
@@ -79,34 +81,43 @@
DmaapService dmaap = new DmaapService();
MR_ClusterService clusters = new MR_ClusterService();
-
- // in 1610, MM should only exist for edge-to-central
- // we use a cname for the central MR cluster that is active, and provision on agent topic on that target
- // but only send 1 message so MM Agents can read it relying on kafka delivery
- for( MR_Cluster central: clusters.getCentralClusters() ) {
- prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
- ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
- if ( ! resp.is2xx() ) {
-
- errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
+ MR_Cluster target_cluster = null;
+ String override = null;
+
+ if ( ! mmPerMR ) {
+ // in ECOMP, MM Agent is only deployed at central, so this case is needed for backwards compatibility
+ // we use a cname for the central MR cluster that is active, and provision on agent topic on that target
+ // but only send 1 message so MM Agents can read it relying on kafka delivery
+ for( MR_Cluster cluster: clusters.getCentralClusters() ) {
+
+ target_cluster = cluster;
+ override = centralFqdn;
+ // we only want to send one message even if there are multiple central clusters
+ break;
+
+ }
+ } else {
+ // In ONAP deployment architecture, the MM Agent is deployed with each target MR
+ target_cluster = clusters.getMr_ClusterByFQDN(mm.getTargetCluster());
+ override = null;
+ }
+
+ prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override );
+ ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
+ if ( ! resp.is2xx() ) {
+
+ errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
+ mm.setStatus(DmaapObject_Status.INVALID);
+ } else {
+ prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override );
+ resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
+ if ( ! resp.is2xx()) {
+ errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
mm.setStatus(DmaapObject_Status.INVALID);
} else {
- prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
- resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
- if ( ! resp.is2xx()) {
- errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
- mm.setStatus(DmaapObject_Status.INVALID);
- } else {
- mm.setStatus(DmaapObject_Status.VALID);
- }
+ mm.setStatus(DmaapObject_Status.VALID);
}
-
- // we only want to send one message even if there are multiple central clusters
- break;
-
- }
-
-
+ }
mm.setLastMod();
return mirrors.put( mm.getMmName(), mm);
diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
index c5937f4..a7991e8 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
@@ -70,6 +70,7 @@
private static String centralCname;
private static boolean createTopicRoles;
private boolean strictGraph = true;
+ private boolean mmPerMR;
public TopicService(){
@@ -81,9 +82,11 @@
if ( unit_test.equals( "Yes" ) ) {
strictGraph = false;
}
+ mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
logger.info( "TopicService properties: CentralCname=" + centralCname +
" defaultGlobarlMrHost=" + defaultGlobalMrHost +
- " createTopicRoles=" + createTopicRoles );
+ " createTopicRoles=" + createTopicRoles +
+ " mmPerMR=" + mmPerMR );
}
public Map<String, Topic> getTopics() {
@@ -451,11 +454,11 @@
case REPLICATION_EDGE_TO_CENTRAL:
case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only
source = cluster.getFqdn();
- target = centralCname;
+ target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname;
break;
case REPLICATION_CENTRAL_TO_EDGE:
case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only
- source = centralCname;
+ source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
target = cluster.getFqdn();
break;
case REPLICATION_CENTRAL_TO_GLOBAL:
diff --git a/version.properties b/version.properties
index 1eff060..8a9bf52 100644
--- a/version.properties
+++ b/version.properties
@@ -27,7 +27,7 @@
major=1
minor=0
-patch=25
+patch=26
base_version=${major}.${minor}.${patch}
# Release must be completed with git revision # in Jenkins