blob: 1a91af40463c99852a2d5999d81b713f81704257 [file] [log] [blame]
elinuxhenrikcce95ff2021-09-05 17:27:02 +02001// -
2// ========================LICENSE_START=================================
3// O-RAN-SC
4// %%
5// Copyright (C) 2021: Nordix Foundation
6// %%
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License at
10//
11// http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing, software
14// distributed under the License is distributed on an "AS IS" BASIS,
15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16// See the License for the specific language governing permissions and
17// limitations under the License.
18// ========================LICENSE_END===================================
19//
20
21package main
22
23import (
elinuxhenrikba96d842021-09-06 16:05:01 +020024 "fmt"
elinuxhenrik382870d2021-09-23 11:09:09 +020025 "net/http"
elinuxhenrik63a42ca2021-09-06 22:16:24 +020026 "sync"
elinuxhenrik65a53d22021-09-29 15:41:26 +020027 "time"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020028
elinuxhenrik803e81a2021-10-26 13:22:55 +020029 "github.com/hashicorp/go-retryablehttp"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020030 log "github.com/sirupsen/logrus"
31 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
elinuxhenrik63a42ca2021-09-06 22:16:24 +020032 "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
elinuxhenrik65a53d22021-09-29 15:41:26 +020033 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
elinuxhenrikba96d842021-09-06 16:05:01 +020034 "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020035)
36
elinuxhenrik803e81a2021-10-26 13:22:55 +020037const timeoutDistributionClient = time.Second * 5
38const retryWaitMax = time.Minute
39const retryMax = int(^uint(0) >> 1)
elinuxhenrik65a53d22021-09-29 15:41:26 +020040
elinuxhenrikcce95ff2021-09-05 17:27:02 +020041var configuration *config.Config
elinuxhenrik803e81a2021-10-26 13:22:55 +020042var retryClient restclient.HTTPClient
elinuxhenrik65a53d22021-09-29 15:41:26 +020043var jobHandler *jobs.JobHandlerImpl
elinuxhenrikcce95ff2021-09-05 17:27:02 +020044
45func init() {
46 configuration = config.New()
elinuxhenrikcce95ff2021-09-05 17:27:02 +020047}
48
49func main() {
elinuxhenrik65a53d22021-09-29 15:41:26 +020050 log.SetLevel(configuration.LogLevel)
51 log.Debug("Initializing DMaaP Mediator Producer")
52 if err := validateConfiguration(configuration); err != nil {
53 log.Fatalf("Stopping producer due to error: %v", err)
54 }
55 callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
56
elinuxhenrik803e81a2021-10-26 13:22:55 +020057 distributionClient := &http.Client{
58 Timeout: timeoutDistributionClient,
elinuxhenrik65a53d22021-09-29 15:41:26 +020059 }
elinuxhenrik803e81a2021-10-26 13:22:55 +020060
61 rawRetryClient := retryablehttp.NewClient()
62 rawRetryClient.RetryWaitMax = retryWaitMax
63 rawRetryClient.RetryMax = retryMax
64 retryClient = rawRetryClient.StandardClient()
65
66 jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, distributionClient)
elinuxhenrik65a53d22021-09-29 15:41:26 +020067 if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil {
68 log.Fatalf("Stopping producer due to: %v", err)
69 }
70
elinuxhenrikcce95ff2021-09-05 17:27:02 +020071 log.Debug("Starting DMaaP Mediator Producer")
elinuxhenrik63a42ca2021-09-06 22:16:24 +020072 wg := new(sync.WaitGroup)
elinuxhenrikba96d842021-09-06 16:05:01 +020073
elinuxhenrik382870d2021-09-23 11:09:09 +020074 // add two goroutines to `wg` WaitGroup, one for each running go routine
75 wg.Add(2)
elinuxhenrik63a42ca2021-09-06 22:16:24 +020076
elinuxhenrik382870d2021-09-23 11:09:09 +020077 log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
elinuxhenrik63a42ca2021-09-06 22:16:24 +020078 go func() {
elinuxhenrik65a53d22021-09-29 15:41:26 +020079 r := server.NewRouter(jobHandler)
elinuxhenrikfe61c612021-09-24 15:08:47 +020080 log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
elinuxhenrik63a42ca2021-09-06 22:16:24 +020081 wg.Done()
82 }()
83
elinuxhenrik28038562021-09-21 15:43:11 +020084 go func() {
elinuxhenrik65a53d22021-09-29 15:41:26 +020085 jobHandler.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
elinuxhenrik28038562021-09-21 15:43:11 +020086 wg.Done()
87 }()
88
elinuxhenrik63a42ca2021-09-06 22:16:24 +020089 // wait until WaitGroup is done
90 wg.Wait()
elinuxhenrikba96d842021-09-06 16:05:01 +020091 log.Debug("Stopping DMaaP Mediator Producer")
elinuxhenrikcce95ff2021-09-05 17:27:02 +020092}
elinuxhenrik65a53d22021-09-29 15:41:26 +020093
94func validateConfiguration(configuration *config.Config) error {
95 if configuration.InfoProducerHost == "" {
96 return fmt.Errorf("missing INFO_PRODUCER_HOST")
97 }
98 return nil
99}
100
101func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error {
elinuxhenrik803e81a2021-10-26 13:22:55 +0200102 registrator := config.NewRegistratorImpl(infoCoordinatorAddress, retryClient)
elinuxhenrik65a53d22021-09-29 15:41:26 +0200103 if types, err := jobHandler.GetTypes(); err == nil {
104 if regErr := registrator.RegisterTypes(types); regErr != nil {
105 return fmt.Errorf("unable to register all types due to: %v", regErr)
106 }
107 } else {
108 return fmt.Errorf("unable to get types to register due to: %v", err)
109 }
110 producer := config.ProducerRegistrationInfo{
111 InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
112 SupportedInfoTypes: jobHandler.GetSupportedTypes(),
113 InfoJobCallbackUrl: callbackAddress + server.AddJobPath,
114 }
115 if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
116 return fmt.Errorf("unable to register producer due to: %v", err)
117 }
118 return nil
119}