Implement secure communications

The communication towards the consumer is not secured in this commit.

Also changed the configuration so that the address of the DMaaP Message
Router is given in one variable, named DMAAP_MR_ADDR.

Issue-ID: NONRTRIC-601
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Icb5b3c255367e823fcae2168ab37603092815893
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index 1a91af4..cdfde57 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -21,9 +21,9 @@
 package main
 
 import (
+	"crypto/tls"
 	"fmt"
 	"net/http"
-	"sync"
 	"time"
 
 	"github.com/hashicorp/go-retryablehttp"
@@ -34,13 +34,7 @@
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
 )
 
-const timeoutDistributionClient = time.Second * 5
-const retryWaitMax = time.Minute
-const retryMax = int(^uint(0) >> 1)
-
 var configuration *config.Config
-var retryClient restclient.HTTPClient
-var jobHandler *jobs.JobHandlerImpl
 
 func init() {
 	configuration = config.New()
@@ -54,52 +48,68 @@
 	}
 	callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
 
-	distributionClient := &http.Client{
-		Timeout: timeoutDistributionClient,
+	var retryClient restclient.HTTPClient
+	if cert, err := createClientCertificate(); err == nil {
+		retryClient = createRetryClient(cert)
+	} else {
+		log.Fatalf("Stopping producer due to error: %v", err)
 	}
 
-	rawRetryClient := retryablehttp.NewClient()
-	rawRetryClient.RetryWaitMax = retryWaitMax
-	rawRetryClient.RetryMax = retryMax
-	retryClient = rawRetryClient.StandardClient()
-
-	jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, distributionClient)
-	if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil {
+	jobHandler := jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, &http.Client{
+		Timeout: time.Second * 5,
+	})
+	if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
 		log.Fatalf("Stopping producer due to: %v", err)
 	}
 
 	log.Debug("Starting DMaaP Mediator Producer")
-	wg := new(sync.WaitGroup)
-
-	// add two goroutines to `wg` WaitGroup, one for each running go routine
-	wg.Add(2)
-
-	log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
 	go func() {
+		log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
 		r := server.NewRouter(jobHandler)
-		log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
-		wg.Done()
+		log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
 	}()
 
-	go func() {
-		jobHandler.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
-		wg.Done()
-	}()
+	go jobHandler.RunJobs(configuration.DMaaPMRAddress)
 
-	// wait until WaitGroup is done
-	wg.Wait()
-	log.Debug("Stopping DMaaP Mediator Producer")
+	keepProducerAlive()
 }
 
 func validateConfiguration(configuration *config.Config) error {
 	if configuration.InfoProducerHost == "" {
 		return fmt.Errorf("missing INFO_PRODUCER_HOST")
 	}
+	if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
+		return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
+	}
 	return nil
 }
 
-func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error {
-	registrator := config.NewRegistratorImpl(infoCoordinatorAddress, retryClient)
+func createClientCertificate() (*tls.Certificate, error) {
+	if cert, err := tls.LoadX509KeyPair(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
+		return &cert, nil
+	} else {
+		return nil, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s", configuration.ProducerCertPath, configuration.ProducerKeyPath)
+	}
+}
+
+func createRetryClient(cert *tls.Certificate) *http.Client {
+	rawRetryClient := retryablehttp.NewClient()
+	rawRetryClient.RetryWaitMax = time.Minute
+	rawRetryClient.RetryMax = int(^uint(0) >> 1)
+	rawRetryClient.HTTPClient.Transport = &http.Transport{
+		TLSClientConfig: &tls.Config{
+			Certificates: []tls.Certificate{
+				*cert,
+			},
+			InsecureSkipVerify: true,
+		},
+	}
+
+	return rawRetryClient.StandardClient()
+}
+
+func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
+	registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
 	if types, err := jobHandler.GetTypes(); err == nil {
 		if regErr := registrator.RegisterTypes(types); regErr != nil {
 			return fmt.Errorf("unable to register all types due to: %v", regErr)
@@ -117,3 +127,8 @@
 	}
 	return nil
 }
+
+func keepProducerAlive() {
+	forever := make(chan int)
+	<-forever
+}