Merge "Provision MM at target MR"
diff --git a/pom.xml b/pom.xml
index 90cc026..6517cd1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -387,7 +387,7 @@
 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 		<jettyVersion>9.4.12.RC2</jettyVersion> 
 		<eelf.version>1.0.0</eelf.version>
-		<artifact.version>1.0.25-SNAPSHOT</artifact.version>
+		<artifact.version>1.0.26-SNAPSHOT</artifact.version>
 		<!-- SONAR -->
 		<jacoco.version>0.7.7.201606060606</jacoco.version>
 		<sonar-jacoco-listeners.version>3.2</sonar-jacoco-listeners.version>
diff --git a/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java b/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java
index a92dbc7..28a9add 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java
@@ -31,8 +31,10 @@
 import java.net.URL;
 import java.net.HttpURLConnection;
 
+import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
 
 import org.apache.commons.codec.binary.Base64;
 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
@@ -49,13 +51,14 @@
 	private  String mmProvCred; 
 	private	String unit_test;
 	private boolean useAAF;
-
+	private boolean hostnameVerify;
 
 	public MrTopicConnection(String user, String pwd ) {
 		mmProvCred = new String( user + ":" + pwd );
 		DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
         unit_test = p.getProperty( "UnitTest", "No" );
     	useAAF= "true".equalsIgnoreCase(p.getProperty("UseAAF", "false"));
+    	hostnameVerify= "true".equalsIgnoreCase(p.getProperty("MR.hostnameVerify", "true"));
 	}
 	
 	public boolean makeTopicConnection( MR_Cluster cluster, String topic, String overrideFqdn ) {
@@ -71,13 +74,28 @@
 		return makeConnection( topicURL );
 	}
 
+	
 	private boolean makeSecureConnection( String pURL ) {
 		logger.info( "makeConnection to " + pURL );
-	
+		
 		try {
+			HostnameVerifier hostnameVerifier = new HostnameVerifier() {
+				@Override
+				public boolean verify( String hostname, SSLSession session ) {
+					return true;
+				}
+			
+			};
+	
+		
 			URL u = new URL( pURL );
-			uc = (HttpsURLConnection) u.openConnection();
+			uc = (HttpsURLConnection) u.openConnection();			
 			uc.setInstanceFollowRedirects(false);
+			if ( ! hostnameVerify ) {
+				HttpsURLConnection ucs = (HttpsURLConnection) uc;
+				ucs.setHostnameVerifier(hostnameVerifier);
+			}
+	
 			logger.info( "open connection to " + pURL );
 			return(true);
 		} catch (Exception e) {
diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
index da9d822..5d695f4 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
@@ -57,6 +57,7 @@
 	private static String defaultConsumerPort;
 	private static String centralFqdn;
 	private int maxTopicsPerMM;
+	private boolean mmPerMR;
 	
 	public MirrorMakerService() {
 		super();
@@ -68,6 +69,7 @@
 		defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");	
 		centralFqdn = p.getProperty("MR.CentralCname", "notSet");
 		maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
+		mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
 	}
 
 	// will create a MM on MMagent if needed
@@ -79,34 +81,43 @@
 	
 		DmaapService dmaap = new DmaapService();
 		MR_ClusterService clusters = new MR_ClusterService();
-	
-		// in 1610, MM should only exist for edge-to-central
-		//  we use a cname for the central MR cluster that is active, and provision on agent topic on that target
-		// but only send 1 message so MM Agents can read it relying on kafka delivery
-		for( MR_Cluster central: clusters.getCentralClusters() ) {
-			prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn  );
-			ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
-			if ( ! resp.is2xx() ) {
-	
-				errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
+		MR_Cluster target_cluster = null;
+		String override = null;
+		
+		if ( ! mmPerMR ) {
+			// in ECOMP, MM Agent is only deployed at central, so this case is needed for backwards compatibility
+			//  we use a cname for the central MR cluster that is active, and provision on agent topic on that target
+			// but only send 1 message so MM Agents can read it relying on kafka delivery
+			for( MR_Cluster cluster: clusters.getCentralClusters() ) {
+
+				target_cluster = cluster;
+				override = centralFqdn;
+				// we only want to send one message even if there are multiple central clusters
+				break;
+			
+			} 
+		} else {
+			// In ONAP deployment architecture, the MM Agent is deployed with each target MR
+			target_cluster = clusters.getMr_ClusterByFQDN(mm.getTargetCluster());
+			override = null;
+		}
+		
+		prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override  );
+		ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
+		if ( ! resp.is2xx() ) {
+
+			errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
+			mm.setStatus(DmaapObject_Status.INVALID);
+		} else {
+			prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override );
+			resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
+			if ( ! resp.is2xx()) {
+				errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
 				mm.setStatus(DmaapObject_Status.INVALID);
 			} else {
-				prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
-				resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
-				if ( ! resp.is2xx()) {
-					errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
-					mm.setStatus(DmaapObject_Status.INVALID);
-				} else {
-					mm.setStatus(DmaapObject_Status.VALID);
-				}
+				mm.setStatus(DmaapObject_Status.VALID);
 			}
-			
-			// we only want to send one message even if there are multiple central clusters
-			break;
-		
-		} 
-		
-
+		}
 
 		mm.setLastMod();
 		return mirrors.put( mm.getMmName(), mm);
diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
index c5937f4..a7991e8 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
@@ -70,6 +70,7 @@
 	private static String centralCname;
 	private static boolean createTopicRoles;
 	private boolean strictGraph = true;
+	private boolean mmPerMR;
 
 
 	public TopicService(){
@@ -81,9 +82,11 @@
 		if ( unit_test.equals( "Yes" ) ) {
 			strictGraph = false;
 		}
+		mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
 		logger.info( "TopicService properties: CentralCname=" + centralCname + 
 				"   defaultGlobarlMrHost=" + defaultGlobalMrHost +
-				" createTopicRoles=" + createTopicRoles );
+				" createTopicRoles=" + createTopicRoles +
+				" mmPerMR=" + mmPerMR );
 	}
 	
 	public Map<String, Topic> getTopics() {			
@@ -451,11 +454,11 @@
 				case REPLICATION_EDGE_TO_CENTRAL:
 				case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
 					source = cluster.getFqdn();
-					target = centralCname;
+					target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname;
 					break;
 				case REPLICATION_CENTRAL_TO_EDGE:
 				case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
-					source = centralCname;
+					source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
 					target = cluster.getFqdn();
 					break;
 				case REPLICATION_CENTRAL_TO_GLOBAL:
diff --git a/version.properties b/version.properties
index 1eff060..8a9bf52 100644
--- a/version.properties
+++ b/version.properties
@@ -27,7 +27,7 @@
 
 major=1
 minor=0
-patch=25
+patch=26
 base_version=${major}.${minor}.${patch}
 
 # Release must be completed with git revision # in Jenkins