Introduce choice between secure and non secure com
The DMaaP Mediator producer now uses secure communication towards any
address configured to use the https scheme. If its own callback is
configured to the https scheme, it will only listen to an https port,
otherwise it will only use an http port.
Also moved the certificate and key files to a separate folder.
Issue-ID: NONRTRIC-633
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Id3bb18e83d40d26740b71be7901911c9a41502e3
diff --git a/dmaap-mediator-producer/Dockerfile b/dmaap-mediator-producer/Dockerfile
index ffc1794..bc09fdc 100644
--- a/dmaap-mediator-producer/Dockerfile
+++ b/dmaap-mediator-producer/Dockerfile
@@ -35,5 +35,6 @@
## Copy from "build" stage
COPY --from=build /dmaapmediatorproducer .
COPY --from=build /app/configs/* /configs/
+COPY --from=build /app/security/* /security/
USER nonroot:nonroot
ENTRYPOINT ["/dmaapmediatorproducer"]
diff --git a/dmaap-mediator-producer/README.md b/dmaap-mediator-producer/README.md
index c22c561..90f8471 100644
--- a/dmaap-mediator-producer/README.md
+++ b/dmaap-mediator-producer/README.md
@@ -2,15 +2,17 @@
This product is a producer of Information Coordinator Service (ICS) jobs for polling topics in DMaaP Message Router (MR) and pushing the messages to a consumer.
+## Configuration
+
The producer takes a number of environment variables, described below, as configuration.
->- INFO_PRODUCER_HOST **Required**. The host for the producer. Example: `http://mrproducer`
->- LOG_LEVEL Optional. The log level, which can be `Error`, `Warn`, `Info` or `Debug`. Defaults to `Info`.
+>- INFO_PRODUCER_HOST **Required**. The host for the producer. Example: `https://mrproducer`
>- INFO_PRODUCER_PORT Optional. The port for the product. Defaults to `8085`.
>- INFO_COORD_ADDR Optional. The address of the Information Coordinator. Defaults to `https://enrichmentservice:8434`.
>- DMAAP_MR_ADDR Optional. The address of the DMaaP Message Router. Defaults to `https://message-router.onap:3905`.
->- PRODUCER_CERT_PATH Optional. The path to the certificate to use for https. Defaults to `configs/producer.crt`
->- PRODUCER_KEY_PATH Optional. The path to the key to the certificate to use for https. Defaults to `configs/producer.key`
+>- PRODUCER_CERT_PATH Optional. The path to the certificate to use for https. Defaults to `security/producer.crt`
+>- PRODUCER_KEY_PATH Optional. The path to the key to the certificate to use for https. Defaults to `security/producer.key`
+>- LOG_LEVEL Optional. The log level, which can be `Error`, `Warn`, `Info` or `Debug`. Defaults to `Info`.
The file `configs/type_config.json` contains the configuration of job types that the producer will support.
@@ -24,12 +26,34 @@
]
}
-The server part of the producer uses https, and the communication towards ICS and MR use https, with no server certificate verification.
+Any of the addresses used by this product can be configured to use https, by specifying it as the scheme of the address URI. Clients configured to use https will not use server certificate verification. The communication towards the consumers will use https if their callback address URI uses that scheme. The producer's own callback will only listen to the scheme configured in the scheme of the info producer host address.
-At start up the producer will register the configured job types in ICS and also register itself as a producer supporting these types.
+The configured public key and cerificate shall be PEM-encoded. A self signed certificate and key are provided in the `security` folder of the project. These files should be replaced for production. To generate a self signed key and certificate, use the example code below:
+
+ openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
+
+## Functionality
+
+At start up the producer will register the configured job types in ICS and also register itself as a producer supporting these types. If ICS is unavailable, the producer will retry to connect indefinetely. The same goes for MR.
Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
+## Development
+
+To make it easy to test during development of the producer, two stubs are provided in the `stub` folder.
+
+One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+>1. cd stub/dmaap
+>2. go build
+>3. ./dmaap
+
+One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+>1. cd stub/consumer
+>2. go build
+>3. ./consumer
+
+Mocks needed for unit tests have been generated using `github.com/stretchr/testify/mock` and are checked in under the `mocks` folder. **Note!** Keep in mind that if any of the mocked interfaces change, a new mock for that interface must be generated and checked in.
+
## License
Copyright (C) 2021 Nordix Foundation.
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index b31b334..eef1b5f 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -21,6 +21,7 @@
package config
import (
+ "fmt"
"os"
"strconv"
@@ -39,16 +40,19 @@
func New() *Config {
return &Config{
- LogLevel: getLogLevel(),
InfoProducerHost: getEnv("INFO_PRODUCER_HOST", ""),
InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085),
InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://enrichmentservice:8434"),
DMaaPMRAddress: getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
- ProducerCertPath: getEnv("PRODUCER_CERT_PATH", "configs/producer.crt"),
- ProducerKeyPath: getEnv("PRODUCER_KEY_PATH", "configs/producer.key"),
+ ProducerCertPath: getEnv("PRODUCER_CERT_PATH", "security/producer.crt"),
+ ProducerKeyPath: getEnv("PRODUCER_KEY_PATH", "security/producer.key"),
+ LogLevel: getLogLevel(),
}
}
+func (c Config) String() string {
+ return fmt.Sprintf("InfoProducerHost: %v, InfoProducerPort: %v, InfoCoordinatorAddress: %v, DMaaPMRAddress: %v, ProducerCertPath: %v, ProducerKeyPath: %v, LogLevel: %v", c.InfoProducerHost, c.InfoProducerPort, c.InfoCoordinatorAddress, c.DMaaPMRAddress, c.ProducerCertPath, c.ProducerKeyPath, c.LogLevel)
+}
func getEnv(key string, defaultVal string) string {
if value, exists := os.LookupEnv(key); exists {
return value
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index 9420a2a..90d3c03 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -72,8 +72,8 @@
InfoProducerPort: 8085,
InfoCoordinatorAddress: "https://enrichmentservice:8434",
DMaaPMRAddress: "https://message-router.onap:3905",
- ProducerCertPath: "configs/producer.crt",
- ProducerKeyPath: "configs/producer.key",
+ ProducerCertPath: "security/producer.crt",
+ ProducerKeyPath: "security/producer.key",
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -99,8 +99,8 @@
InfoProducerPort: 8085,
InfoCoordinatorAddress: "https://enrichmentservice:8434",
DMaaPMRAddress: "https://message-router.onap:3905",
- ProducerCertPath: "configs/producer.crt",
- ProducerKeyPath: "configs/producer.key",
+ ProducerCertPath: "security/producer.crt",
+ ProducerKeyPath: "security/producer.key",
}
got := New()
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
index 7c762d9..8ccd4b2 100644
--- a/dmaap-mediator-producer/internal/restclient/HTTPClient.go
+++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
@@ -22,9 +22,15 @@
import (
"bytes"
+ "crypto/tls"
"fmt"
"io"
+ "math"
"net/http"
+ "net/url"
+ "time"
+
+ "github.com/hashicorp/go-retryablehttp"
)
// HTTPClient interface
@@ -98,3 +104,44 @@
}
return putError
}
+
+func CreateClientCertificate(certPath string, keyPath string) (tls.Certificate, error) {
+ if cert, err := tls.LoadX509KeyPair(certPath, keyPath); err == nil {
+ return cert, nil
+ } else {
+ return tls.Certificate{}, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s due to: %v", certPath, keyPath, err)
+ }
+}
+
+func CreateRetryClient(cert tls.Certificate) *http.Client {
+ rawRetryClient := retryablehttp.NewClient()
+ rawRetryClient.RetryWaitMax = time.Minute
+ rawRetryClient.RetryMax = math.MaxInt
+ rawRetryClient.HTTPClient.Transport = getSecureTransportWithoutVerify(cert)
+
+ client := rawRetryClient.StandardClient()
+ return client
+}
+
+func CreateClientWithoutRetry(cert tls.Certificate, timeout time.Duration) *http.Client {
+ return &http.Client{
+ Timeout: timeout,
+ Transport: getSecureTransportWithoutVerify(cert),
+ }
+}
+
+func getSecureTransportWithoutVerify(cert tls.Certificate) *http.Transport {
+ return &http.Transport{
+ TLSClientConfig: &tls.Config{
+ Certificates: []tls.Certificate{
+ cert,
+ },
+ InsecureSkipVerify: true,
+ },
+ }
+}
+
+func IsUrlSecure(configUrl string) bool {
+ u, _ := url.Parse(configUrl)
+ return u.Scheme == "https"
+}
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
index 3ded9be..20c26dd 100644
--- a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
+++ b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
@@ -22,12 +22,17 @@
import (
"bytes"
+ "crypto/tls"
"errors"
"fmt"
"io/ioutil"
+ "math"
"net/http"
+ "reflect"
"testing"
+ "time"
+ "github.com/hashicorp/go-retryablehttp"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"oransc.org/nonrtric/dmaapmediatorproducer/mocks/httpclient"
@@ -202,3 +207,73 @@
})
}
}
+
+func Test_createClientCertificate(t *testing.T) {
+ assertions := require.New(t)
+ wantedCert, _ := tls.LoadX509KeyPair("../../security/producer.crt", "../../security/producer.key")
+ type args struct {
+ certPath string
+ keyPath string
+ }
+ tests := []struct {
+ name string
+ args args
+ wantCert tls.Certificate
+ wantErr error
+ }{
+ {
+ name: "Paths to cert info ok should return cerftificate",
+ args: args{
+ certPath: "../../security/producer.crt",
+ keyPath: "../../security/producer.key",
+ },
+ wantCert: wantedCert,
+ },
+ {
+ name: "Paths to cert info not ok should return error with info about error",
+ args: args{
+ certPath: "wrong_cert",
+ keyPath: "wrong_key",
+ },
+ wantErr: fmt.Errorf("cannot create x509 keypair from cert file wrong_cert and key file wrong_key due to: open wrong_cert: no such file or directory"),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ cert, err := CreateClientCertificate(tt.args.certPath, tt.args.keyPath)
+ assertions.Equal(tt.wantCert, cert, tt.name)
+ assertions.Equal(tt.wantErr, err, tt.name)
+ })
+ }
+}
+
+func Test_CreateRetryClient(t *testing.T) {
+ assertions := require.New(t)
+
+ client := CreateRetryClient(tls.Certificate{})
+
+ transport := client.Transport
+ assertions.Equal("*retryablehttp.RoundTripper", reflect.TypeOf(transport).String())
+ retryableTransport := transport.(*retryablehttp.RoundTripper)
+ retryableClient := retryableTransport.Client
+ assertions.Equal(time.Minute, retryableClient.RetryWaitMax)
+ assertions.Equal(math.MaxInt, retryableClient.RetryMax)
+}
+
+func Test_CreateClientWithoutRetry(t *testing.T) {
+ assertions := require.New(t)
+
+ client := CreateClientWithoutRetry(tls.Certificate{}, 5*time.Second)
+
+ transport := client.Transport
+ assertions.Equal("*http.Transport", reflect.TypeOf(transport).String())
+ assertions.Equal(5*time.Second, client.Timeout)
+}
+
+func TestIsUrlSecured(t *testing.T) {
+ assertions := require.New(t)
+
+ assertions.True(IsUrlSecure("https://url"))
+
+ assertions.False(IsUrlSecure("http://url"))
+}
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index 380087f..74f4edf 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -26,7 +26,6 @@
"net/http"
"time"
- "github.com/hashicorp/go-retryablehttp"
log "github.com/sirupsen/logrus"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
@@ -43,21 +42,21 @@
func main() {
log.SetLevel(configuration.LogLevel)
log.Debug("Initializing DMaaP Mediator Producer")
+ log.Debug("Using configuration: ", configuration)
if err := validateConfiguration(configuration); err != nil {
log.Fatalf("Stopping producer due to error: %v", err)
}
callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
- var retryClient restclient.HTTPClient
- if cert, err := createClientCertificate(); err == nil {
- retryClient = createRetryClient(cert)
+ var cert tls.Certificate
+ if c, err := restclient.CreateClientCertificate(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
+ cert = c
} else {
log.Fatalf("Stopping producer due to error: %v", err)
}
+ retryClient := restclient.CreateRetryClient(cert)
- jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, &http.Client{
- Timeout: time.Second * 5,
- })
+ jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 5*time.Second))
if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
log.Fatalf("Stopping producer due to: %v", err)
}
@@ -67,7 +66,11 @@
go func() {
log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
r := server.NewRouter(jobsManager)
- log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
+ if restclient.IsUrlSecure(callbackAddress) {
+ log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
+ } else {
+ log.Fatalf("Server stopped: %v", http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
+ }
}()
keepProducerAlive()
@@ -82,34 +85,9 @@
}
return nil
}
-
-func createClientCertificate() (*tls.Certificate, error) {
- if cert, err := tls.LoadX509KeyPair(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
- return &cert, nil
- } else {
- return nil, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s due to: %v", configuration.ProducerCertPath, configuration.ProducerKeyPath, err)
- }
-}
-
-func createRetryClient(cert *tls.Certificate) *http.Client {
- rawRetryClient := retryablehttp.NewClient()
- rawRetryClient.RetryWaitMax = time.Minute
- rawRetryClient.RetryMax = int(^uint(0) >> 1)
- rawRetryClient.HTTPClient.Transport = &http.Transport{
- TLSClientConfig: &tls.Config{
- Certificates: []tls.Certificate{
- *cert,
- },
- InsecureSkipVerify: true,
- },
- }
-
- return rawRetryClient.StandardClient()
-}
-
-func registerTypesAndProducer(jobHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
+func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
- if types, err := jobHandler.LoadTypesFromConfiguration(); err == nil {
+ if types, err := jobTypesHandler.LoadTypesFromConfiguration(); err == nil {
if regErr := registrator.RegisterTypes(types); regErr != nil {
return fmt.Errorf("unable to register all types due to: %v", regErr)
}
@@ -118,7 +96,7 @@
}
producer := config.ProducerRegistrationInfo{
InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
- SupportedInfoTypes: jobHandler.GetSupportedTypes(),
+ SupportedInfoTypes: jobTypesHandler.GetSupportedTypes(),
InfoJobCallbackUrl: callbackAddress + server.AddJobPath,
}
if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
diff --git a/dmaap-mediator-producer/configs/producer.crt b/dmaap-mediator-producer/security/producer.crt
similarity index 100%
rename from dmaap-mediator-producer/configs/producer.crt
rename to dmaap-mediator-producer/security/producer.crt
diff --git a/dmaap-mediator-producer/configs/producer.key b/dmaap-mediator-producer/security/producer.key
similarity index 100%
rename from dmaap-mediator-producer/configs/producer.key
rename to dmaap-mediator-producer/security/producer.key
diff --git a/dmaap-mediator-producer/stub/dmaap/mrstub.go b/dmaap-mediator-producer/stub/dmaap/mrstub.go
index 0ec38cc..82ae08d 100644
--- a/dmaap-mediator-producer/stub/dmaap/mrstub.go
+++ b/dmaap-mediator-producer/stub/dmaap/mrstub.go
@@ -57,7 +57,7 @@
http.HandleFunc("/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages", handleData)
fmt.Print("Starting mr on port: ", *port)
- http.ListenAndServeTLS(fmt.Sprintf(":%v", *port), "../../configs/producer.crt", "../../configs/producer.key", nil)
+ http.ListenAndServeTLS(fmt.Sprintf(":%v", *port), "../../security/producer.crt", "../../security/producer.key", nil)
}