Status handler for DMaaP mediator producer

Issue-ID: NONRTRIC-595
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Ia97075ba6532c040659e7781ee6e753511d7b91a
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index a3e3a11..764e89c 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -25,10 +25,12 @@
 )
 
 type Config struct {
-	LogLevel                           string
-	InfoJobCallbackUrl                 string
-	InfoCoordinatorAddress             string
-	InfoProducerSupervisionCallbackUrl string
+	LogLevel                            string
+	InfoProducerSupervisionCallbackHost string
+	InfoProducerSupervisionCallbackPort string
+	InfoJobCallbackHost                 string
+	InfoJobCallbackPort                 string
+	InfoCoordinatorAddress              string
 }
 
 type ProducerRegistrationInfo struct {
@@ -39,10 +41,12 @@
 
 func New() *Config {
 	return &Config{
-		LogLevel:                           getEnv("LOG_LEVEL", "Info"),
-		InfoJobCallbackUrl:                 getEnv("INFO_JOB_CALLBACK_URL", ""),
-		InfoCoordinatorAddress:             getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
-		InfoProducerSupervisionCallbackUrl: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", ""),
+		LogLevel:                            getEnv("LOG_LEVEL", "Info"),
+		InfoProducerSupervisionCallbackHost: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", ""),
+		InfoProducerSupervisionCallbackPort: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8085"),
+		InfoJobCallbackHost:                 getEnv("INFO_JOB_CALLBACK_HOST", ""),
+		InfoJobCallbackPort:                 getEnv("INFO_JOB_CALLBACK_PORT", "8086"),
+		InfoCoordinatorAddress:              getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
 	}
 }
 
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index 88ba1d8..6b02e42 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -28,15 +28,19 @@
 
 func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
 	os.Setenv("LOG_LEVEL", "Debug")
-	os.Setenv("INFO_JOB_CALLBACK_URL", "jobCallbackUrl")
+	os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", "supervisionCallbackHost")
+	os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "supervisionCallbackPort")
+	os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost")
+	os.Setenv("INFO_JOB_CALLBACK_PORT", "jobCallbackPort")
 	os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
-	os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", "supervisionCallbackUrl")
 	defer os.Clearenv()
 	wantConfig := Config{
-		LogLevel:                           "Debug",
-		InfoJobCallbackUrl:                 "jobCallbackUrl",
-		InfoCoordinatorAddress:             "infoCoordAddr",
-		InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
+		LogLevel:                            "Debug",
+		InfoProducerSupervisionCallbackHost: "supervisionCallbackHost",
+		InfoProducerSupervisionCallbackPort: "supervisionCallbackPort",
+		InfoJobCallbackHost:                 "jobCallbackHost",
+		InfoJobCallbackPort:                 "jobCallbackPort",
+		InfoCoordinatorAddress:              "infoCoordAddr",
 	}
 	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
 		t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -45,10 +49,12 @@
 
 func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
 	wantConfig := Config{
-		LogLevel:                           "Info",
-		InfoJobCallbackUrl:                 "",
-		InfoCoordinatorAddress:             "http://enrichmentservice:8083",
-		InfoProducerSupervisionCallbackUrl: "",
+		LogLevel:                            "Info",
+		InfoProducerSupervisionCallbackHost: "",
+		InfoProducerSupervisionCallbackPort: "8085",
+		InfoJobCallbackHost:                 "",
+		InfoJobCallbackPort:                 "8086",
+		InfoCoordinatorAddress:              "http://enrichmentservice:8083",
 	}
 	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
 		t.Errorf("New() = %v, want %v", got, &wantConfig)
diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go
new file mode 100644
index 0000000..ca30d73
--- /dev/null
+++ b/dmaap-mediator-producer/internal/server/server.go
@@ -0,0 +1,40 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package server
+
+import (
+	"fmt"
+	"net/http"
+)
+
+func StatusHandler(w http.ResponseWriter, r *http.Request) {
+	if r.URL.Path != "/" {
+		http.Error(w, "404 not found.", http.StatusNotFound)
+		return
+	}
+
+	if r.Method != "GET" {
+		http.Error(w, "Method is not supported.", http.StatusNotFound)
+		return
+	}
+
+	fmt.Fprintf(w, "All is well!")
+}
diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go
new file mode 100644
index 0000000..9213216
--- /dev/null
+++ b/dmaap-mediator-producer/internal/server/server_test.go
@@ -0,0 +1,90 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package server
+
+import (
+	"io"
+	"net/http"
+	"net/http/httptest"
+	"testing"
+
+	"github.com/stretchr/testify/require"
+)
+
+func TestStatusHandler(t *testing.T) {
+	assertions := require.New(t)
+	type args struct {
+		responseRecorder *httptest.ResponseRecorder
+		r                *http.Request
+	}
+	tests := []struct {
+		name         string
+		args         args
+		wantedStatus int
+		wantedBody   string
+	}{
+		{
+			name: "StatusHandler with correct path and method, should return OK",
+			args: args{
+				responseRecorder: httptest.NewRecorder(),
+				r:                newRequest("GET", "/", nil, t),
+			},
+			wantedStatus: http.StatusOK,
+			wantedBody:   "All is well!",
+		},
+		{
+			name: "StatusHandler with incorrect path, should return NotFound",
+			args: args{
+				responseRecorder: httptest.NewRecorder(),
+				r:                newRequest("GET", "/wrong", nil, t),
+			},
+			wantedStatus: http.StatusNotFound,
+			wantedBody:   "404 not found.\n",
+		},
+		{
+			name: "StatusHandler with incorrect method, should return NotFound",
+			args: args{
+				responseRecorder: httptest.NewRecorder(),
+				r:                newRequest("PUT", "/", nil, t),
+			},
+			wantedStatus: http.StatusNotFound,
+			wantedBody:   "Method is not supported.\n",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			handler := http.HandlerFunc(StatusHandler)
+			handler.ServeHTTP(tt.args.responseRecorder, tt.args.r)
+			assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code)
+
+			assertions.Equal(tt.wantedBody, tt.args.responseRecorder.Body.String())
+		})
+	}
+}
+
+func newRequest(method string, url string, body io.Reader, t *testing.T) *http.Request {
+	if req, err := http.NewRequest(method, url, body); err == nil {
+		return req
+	} else {
+		t.Fatalf("Could not create request due to: %v", err)
+		return nil
+	}
+}
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index d38496f..b357f69 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -21,14 +21,18 @@
 package main
 
 import (
-	"time"
+	"fmt"
+	"net/http"
 
 	log "github.com/sirupsen/logrus"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
 )
 
 var configuration *config.Config
+var supervisionCallbackAddress string
+var jobInfoCallbackAddress string
 
 func init() {
 	configuration = config.New()
@@ -39,12 +43,15 @@
 	}
 
 	log.Debug("Initializing DMaaP Mediator Producer")
-	if configuration.InfoJobCallbackUrl == "" {
-		log.Fatal("Missing INFO_JOB_CALLBACK_URL")
+	if configuration.InfoProducerSupervisionCallbackHost == "" {
+		log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST")
 	}
-	if configuration.InfoProducerSupervisionCallbackUrl == "" {
-		log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_URL")
+	supervisionCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerSupervisionCallbackHost, configuration.InfoProducerSupervisionCallbackPort)
+
+	if configuration.InfoJobCallbackHost == "" {
+		log.Fatal("Missing INFO_JOB_CALLBACK_HOST")
 	}
+	jobInfoCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoJobCallbackHost, configuration.InfoJobCallbackPort)
 
 	registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
 	if types, err := jobtypes.GetTypes(); err == nil {
@@ -55,9 +62,9 @@
 		log.Fatalf("Unable to get types to register due to: %v", err)
 	}
 	producer := config.ProducerRegistrationInfo{
-		InfoProducerSupervisionCallbackUrl: configuration.InfoProducerSupervisionCallbackUrl,
+		InfoProducerSupervisionCallbackUrl: supervisionCallbackAddress,
 		SupportedInfoTypes:                 jobtypes.GetSupportedTypes(),
-		InfoJobCallbackUrl:                 configuration.InfoJobCallbackUrl,
+		InfoJobCallbackUrl:                 jobInfoCallbackAddress,
 	}
 	if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
 		log.Fatalf("Unable to register producer due to: %v", err)
@@ -66,5 +73,11 @@
 
 func main() {
 	log.Debug("Starting DMaaP Mediator Producer")
-	time.Sleep(1000 * time.Millisecond)
+	log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
+	http.HandleFunc("/", server.StatusHandler)
+
+	if err := http.ListenAndServe(":"+configuration.InfoProducerSupervisionCallbackPort, nil); err != nil {
+		log.Fatal(err)
+	}
+	log.Debug("Stopping DMaaP Mediator Producer")
 }