blob: 380087f3c495e185d6daeab9f9126b900a4cb050 [file] [log] [blame]
elinuxhenrikcce95ff2021-09-05 17:27:02 +02001// -
2// ========================LICENSE_START=================================
3// O-RAN-SC
4// %%
5// Copyright (C) 2021: Nordix Foundation
6// %%
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License at
10//
11// http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing, software
14// distributed under the License is distributed on an "AS IS" BASIS,
15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16// See the License for the specific language governing permissions and
17// limitations under the License.
18// ========================LICENSE_END===================================
19//
20
21package main
22
23import (
elinuxhenrikc4960f12021-10-28 16:27:57 +020024 "crypto/tls"
elinuxhenrikba96d842021-09-06 16:05:01 +020025 "fmt"
elinuxhenrik382870d2021-09-23 11:09:09 +020026 "net/http"
elinuxhenrik65a53d22021-09-29 15:41:26 +020027 "time"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020028
elinuxhenrik803e81a2021-10-26 13:22:55 +020029 "github.com/hashicorp/go-retryablehttp"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020030 log "github.com/sirupsen/logrus"
31 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
elinuxhenrik63a42ca2021-09-06 22:16:24 +020032 "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
elinuxhenrik65a53d22021-09-29 15:41:26 +020033 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
elinuxhenrikba96d842021-09-06 16:05:01 +020034 "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020035)
36
37var configuration *config.Config
38
39func init() {
40 configuration = config.New()
elinuxhenrikcce95ff2021-09-05 17:27:02 +020041}
42
43func main() {
elinuxhenrik65a53d22021-09-29 15:41:26 +020044 log.SetLevel(configuration.LogLevel)
45 log.Debug("Initializing DMaaP Mediator Producer")
46 if err := validateConfiguration(configuration); err != nil {
47 log.Fatalf("Stopping producer due to error: %v", err)
48 }
49 callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
50
elinuxhenrikc4960f12021-10-28 16:27:57 +020051 var retryClient restclient.HTTPClient
52 if cert, err := createClientCertificate(); err == nil {
53 retryClient = createRetryClient(cert)
54 } else {
55 log.Fatalf("Stopping producer due to error: %v", err)
elinuxhenrik65a53d22021-09-29 15:41:26 +020056 }
elinuxhenrik803e81a2021-10-26 13:22:55 +020057
elinuxhenrik6e0d5842021-11-04 10:29:02 +010058 jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, &http.Client{
elinuxhenrikc4960f12021-10-28 16:27:57 +020059 Timeout: time.Second * 5,
60 })
elinuxhenrik6e0d5842021-11-04 10:29:02 +010061 if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
elinuxhenrik65a53d22021-09-29 15:41:26 +020062 log.Fatalf("Stopping producer due to: %v", err)
63 }
elinuxhenrik6e0d5842021-11-04 10:29:02 +010064 jobsManager.StartJobs()
elinuxhenrik65a53d22021-09-29 15:41:26 +020065
elinuxhenrikcce95ff2021-09-05 17:27:02 +020066 log.Debug("Starting DMaaP Mediator Producer")
elinuxhenrik63a42ca2021-09-06 22:16:24 +020067 go func() {
elinuxhenrikc4960f12021-10-28 16:27:57 +020068 log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
elinuxhenrik6e0d5842021-11-04 10:29:02 +010069 r := server.NewRouter(jobsManager)
elinuxhenrikc4960f12021-10-28 16:27:57 +020070 log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
elinuxhenrik63a42ca2021-09-06 22:16:24 +020071 }()
72
elinuxhenrikc4960f12021-10-28 16:27:57 +020073 keepProducerAlive()
elinuxhenrikcce95ff2021-09-05 17:27:02 +020074}
elinuxhenrik65a53d22021-09-29 15:41:26 +020075
76func validateConfiguration(configuration *config.Config) error {
77 if configuration.InfoProducerHost == "" {
78 return fmt.Errorf("missing INFO_PRODUCER_HOST")
79 }
elinuxhenrikc4960f12021-10-28 16:27:57 +020080 if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
81 return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
82 }
elinuxhenrik65a53d22021-09-29 15:41:26 +020083 return nil
84}
85
elinuxhenrikc4960f12021-10-28 16:27:57 +020086func createClientCertificate() (*tls.Certificate, error) {
87 if cert, err := tls.LoadX509KeyPair(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
88 return &cert, nil
89 } else {
elinuxhenrik11b500e2021-11-02 09:19:26 +010090 return nil, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s due to: %v", configuration.ProducerCertPath, configuration.ProducerKeyPath, err)
elinuxhenrikc4960f12021-10-28 16:27:57 +020091 }
92}
93
94func createRetryClient(cert *tls.Certificate) *http.Client {
95 rawRetryClient := retryablehttp.NewClient()
96 rawRetryClient.RetryWaitMax = time.Minute
97 rawRetryClient.RetryMax = int(^uint(0) >> 1)
98 rawRetryClient.HTTPClient.Transport = &http.Transport{
99 TLSClientConfig: &tls.Config{
100 Certificates: []tls.Certificate{
101 *cert,
102 },
103 InsecureSkipVerify: true,
104 },
105 }
106
107 return rawRetryClient.StandardClient()
108}
109
elinuxhenrik6e0d5842021-11-04 10:29:02 +0100110func registerTypesAndProducer(jobHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
elinuxhenrikc4960f12021-10-28 16:27:57 +0200111 registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
elinuxhenrik6e0d5842021-11-04 10:29:02 +0100112 if types, err := jobHandler.LoadTypesFromConfiguration(); err == nil {
elinuxhenrik65a53d22021-09-29 15:41:26 +0200113 if regErr := registrator.RegisterTypes(types); regErr != nil {
114 return fmt.Errorf("unable to register all types due to: %v", regErr)
115 }
116 } else {
117 return fmt.Errorf("unable to get types to register due to: %v", err)
118 }
119 producer := config.ProducerRegistrationInfo{
120 InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
121 SupportedInfoTypes: jobHandler.GetSupportedTypes(),
122 InfoJobCallbackUrl: callbackAddress + server.AddJobPath,
123 }
124 if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
125 return fmt.Errorf("unable to register producer due to: %v", err)
126 }
127 return nil
128}
elinuxhenrikc4960f12021-10-28 16:27:57 +0200129
130func keepProducerAlive() {
131 forever := make(chan int)
132 <-forever
133}