blob: 2d72466b60e238f931c1d8209e27c23171088404 [file] [log] [blame]
elinuxhenrikcce95ff2021-09-05 17:27:02 +02001// -
2// ========================LICENSE_START=================================
3// O-RAN-SC
4// %%
5// Copyright (C) 2021: Nordix Foundation
6// %%
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License at
10//
11// http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing, software
14// distributed under the License is distributed on an "AS IS" BASIS,
15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16// See the License for the specific language governing permissions and
17// limitations under the License.
18// ========================LICENSE_END===================================
19//
20
21package main
22
23import (
elinuxhenrikc4960f12021-10-28 16:27:57 +020024 "crypto/tls"
elinuxhenrikba96d842021-09-06 16:05:01 +020025 "fmt"
elinuxhenrik382870d2021-09-23 11:09:09 +020026 "net/http"
elinuxhenrik65a53d22021-09-29 15:41:26 +020027 "time"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020028
29 log "github.com/sirupsen/logrus"
30 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
elinuxhenrik63a42ca2021-09-06 22:16:24 +020031 "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
elinuxhenrik65a53d22021-09-29 15:41:26 +020032 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
elinuxhenrikba96d842021-09-06 16:05:01 +020033 "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020034)
35
36var configuration *config.Config
37
38func init() {
39 configuration = config.New()
elinuxhenrikcce95ff2021-09-05 17:27:02 +020040}
41
42func main() {
elinuxhenrik65a53d22021-09-29 15:41:26 +020043 log.SetLevel(configuration.LogLevel)
44 log.Debug("Initializing DMaaP Mediator Producer")
elinuxhenrikbfce1942021-11-08 15:58:20 +010045 log.Debug("Using configuration: ", configuration)
elinuxhenrik65a53d22021-09-29 15:41:26 +020046 if err := validateConfiguration(configuration); err != nil {
47 log.Fatalf("Stopping producer due to error: %v", err)
48 }
49 callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
50
elinuxhenrikbfce1942021-11-08 15:58:20 +010051 var cert tls.Certificate
52 if c, err := restclient.CreateClientCertificate(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
53 cert = c
elinuxhenrikc4960f12021-10-28 16:27:57 +020054 } else {
55 log.Fatalf("Stopping producer due to error: %v", err)
elinuxhenrik65a53d22021-09-29 15:41:26 +020056 }
elinuxhenrikbfce1942021-11-08 15:58:20 +010057 retryClient := restclient.CreateRetryClient(cert)
elinuxhenrik803e81a2021-10-26 13:22:55 +020058
elinuxhenrikf4969902021-11-24 15:53:24 +010059 jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
elinuxhenrik6e0d5842021-11-04 10:29:02 +010060 if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
elinuxhenrik65a53d22021-09-29 15:41:26 +020061 log.Fatalf("Stopping producer due to: %v", err)
62 }
elinuxhenrik036962c2021-11-17 15:46:07 +010063 jobsManager.StartJobsForAllTypes()
elinuxhenrik65a53d22021-09-29 15:41:26 +020064
elinuxhenrikcce95ff2021-09-05 17:27:02 +020065 log.Debug("Starting DMaaP Mediator Producer")
elinuxhenrik63a42ca2021-09-06 22:16:24 +020066 go func() {
elinuxhenrikc4960f12021-10-28 16:27:57 +020067 log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
elinuxhenrik6e0d5842021-11-04 10:29:02 +010068 r := server.NewRouter(jobsManager)
elinuxhenrikbfce1942021-11-08 15:58:20 +010069 if restclient.IsUrlSecure(callbackAddress) {
70 log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
71 } else {
72 log.Fatalf("Server stopped: %v", http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
73 }
elinuxhenrik63a42ca2021-09-06 22:16:24 +020074 }()
75
elinuxhenrikc4960f12021-10-28 16:27:57 +020076 keepProducerAlive()
elinuxhenrikcce95ff2021-09-05 17:27:02 +020077}
elinuxhenrik65a53d22021-09-29 15:41:26 +020078
79func validateConfiguration(configuration *config.Config) error {
80 if configuration.InfoProducerHost == "" {
81 return fmt.Errorf("missing INFO_PRODUCER_HOST")
82 }
elinuxhenrikc4960f12021-10-28 16:27:57 +020083 if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
84 return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
85 }
elinuxhenrik65a53d22021-09-29 15:41:26 +020086 return nil
87}
elinuxhenrikbfce1942021-11-08 15:58:20 +010088func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
elinuxhenrikc4960f12021-10-28 16:27:57 +020089 registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
elinuxhenrikf4969902021-11-24 15:53:24 +010090 configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json")
91 if err != nil {
92 return fmt.Errorf("unable to register all types due to: %v", err)
elinuxhenrik65a53d22021-09-29 15:41:26 +020093 }
elinuxhenrikf4969902021-11-24 15:53:24 +010094 regErr := registrator.RegisterTypes(jobTypesHandler.LoadTypesFromConfiguration(configTypes))
95 if regErr != nil {
96 return fmt.Errorf("unable to register all types due to: %v", regErr)
97 }
98
elinuxhenrik65a53d22021-09-29 15:41:26 +020099 producer := config.ProducerRegistrationInfo{
100 InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
elinuxhenrikbfce1942021-11-08 15:58:20 +0100101 SupportedInfoTypes: jobTypesHandler.GetSupportedTypes(),
elinuxhenrik65a53d22021-09-29 15:41:26 +0200102 InfoJobCallbackUrl: callbackAddress + server.AddJobPath,
103 }
104 if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
105 return fmt.Errorf("unable to register producer due to: %v", err)
106 }
107 return nil
108}
elinuxhenrikc4960f12021-10-28 16:27:57 +0200109
110func keepProducerAlive() {
111 forever := make(chan int)
112 <-forever
113}