blob: beeb995d9c0b329ef1d057f864ffd6e442a5256a [file] [log] [blame]
elinuxhenrikcce95ff2021-09-05 17:27:02 +02001// -
2// ========================LICENSE_START=================================
3// O-RAN-SC
4// %%
5// Copyright (C) 2021: Nordix Foundation
6// %%
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License at
10//
11// http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing, software
14// distributed under the License is distributed on an "AS IS" BASIS,
15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16// See the License for the specific language governing permissions and
17// limitations under the License.
18// ========================LICENSE_END===================================
19//
20
21package main
22
23import (
elinuxhenrikba96d842021-09-06 16:05:01 +020024 "fmt"
elinuxhenrik382870d2021-09-23 11:09:09 +020025 "net/http"
elinuxhenrik63a42ca2021-09-06 22:16:24 +020026 "sync"
elinuxhenrik65a53d22021-09-29 15:41:26 +020027 "time"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020028
29 log "github.com/sirupsen/logrus"
30 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
elinuxhenrik63a42ca2021-09-06 22:16:24 +020031 "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
elinuxhenrik65a53d22021-09-29 15:41:26 +020032 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
elinuxhenrikba96d842021-09-06 16:05:01 +020033 "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020034)
35
elinuxhenrik65a53d22021-09-29 15:41:26 +020036const timeoutHTTPClient = time.Second * 5
37const timeoutPollClient = time.Second * 15
38
elinuxhenrikcce95ff2021-09-05 17:27:02 +020039var configuration *config.Config
elinuxhenrik65a53d22021-09-29 15:41:26 +020040var httpClient restclient.HTTPClient
41var jobHandler *jobs.JobHandlerImpl
elinuxhenrikcce95ff2021-09-05 17:27:02 +020042
43func init() {
44 configuration = config.New()
elinuxhenrikcce95ff2021-09-05 17:27:02 +020045}
46
47func main() {
elinuxhenrik65a53d22021-09-29 15:41:26 +020048 log.SetLevel(configuration.LogLevel)
49 log.Debug("Initializing DMaaP Mediator Producer")
50 if err := validateConfiguration(configuration); err != nil {
51 log.Fatalf("Stopping producer due to error: %v", err)
52 }
53 callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
54
55 httpClient = &http.Client{
56 Timeout: timeoutHTTPClient,
57 }
58 pollClient := &http.Client{
59 Timeout: timeoutPollClient,
60 }
61 jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", pollClient, httpClient)
62 if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil {
63 log.Fatalf("Stopping producer due to: %v", err)
64 }
65
elinuxhenrikcce95ff2021-09-05 17:27:02 +020066 log.Debug("Starting DMaaP Mediator Producer")
elinuxhenrik63a42ca2021-09-06 22:16:24 +020067 wg := new(sync.WaitGroup)
elinuxhenrikba96d842021-09-06 16:05:01 +020068
elinuxhenrik382870d2021-09-23 11:09:09 +020069 // add two goroutines to `wg` WaitGroup, one for each running go routine
70 wg.Add(2)
elinuxhenrik63a42ca2021-09-06 22:16:24 +020071
elinuxhenrik382870d2021-09-23 11:09:09 +020072 log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
elinuxhenrik63a42ca2021-09-06 22:16:24 +020073 go func() {
elinuxhenrik65a53d22021-09-29 15:41:26 +020074 r := server.NewRouter(jobHandler)
elinuxhenrikfe61c612021-09-24 15:08:47 +020075 log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
elinuxhenrik63a42ca2021-09-06 22:16:24 +020076 wg.Done()
77 }()
78
elinuxhenrik28038562021-09-21 15:43:11 +020079 go func() {
elinuxhenrik65a53d22021-09-29 15:41:26 +020080 jobHandler.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
elinuxhenrik28038562021-09-21 15:43:11 +020081 wg.Done()
82 }()
83
elinuxhenrik63a42ca2021-09-06 22:16:24 +020084 // wait until WaitGroup is done
85 wg.Wait()
elinuxhenrikba96d842021-09-06 16:05:01 +020086 log.Debug("Stopping DMaaP Mediator Producer")
elinuxhenrikcce95ff2021-09-05 17:27:02 +020087}
elinuxhenrik65a53d22021-09-29 15:41:26 +020088
89func validateConfiguration(configuration *config.Config) error {
90 if configuration.InfoProducerHost == "" {
91 return fmt.Errorf("missing INFO_PRODUCER_HOST")
92 }
93 return nil
94}
95
96func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error {
97 registrator := config.NewRegistratorImpl(infoCoordinatorAddress, httpClient)
98 if types, err := jobHandler.GetTypes(); err == nil {
99 if regErr := registrator.RegisterTypes(types); regErr != nil {
100 return fmt.Errorf("unable to register all types due to: %v", regErr)
101 }
102 } else {
103 return fmt.Errorf("unable to get types to register due to: %v", err)
104 }
105 producer := config.ProducerRegistrationInfo{
106 InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
107 SupportedInfoTypes: jobHandler.GetSupportedTypes(),
108 InfoJobCallbackUrl: callbackAddress + server.AddJobPath,
109 }
110 if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
111 return fmt.Errorf("unable to register producer due to: %v", err)
112 }
113 return nil
114}