blob: 47e12e9af0905d73090ce70161386a9ce6938813 [file] [log] [blame]
elinuxhenrikcce95ff2021-09-05 17:27:02 +02001// -
2// ========================LICENSE_START=================================
3// O-RAN-SC
4// %%
5// Copyright (C) 2021: Nordix Foundation
6// %%
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License at
10//
11// http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing, software
14// distributed under the License is distributed on an "AS IS" BASIS,
15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16// See the License for the specific language governing permissions and
17// limitations under the License.
18// ========================LICENSE_END===================================
19//
20
21package main
22
23import (
elinuxhenrikba96d842021-09-06 16:05:01 +020024 "fmt"
elinuxhenrik63a42ca2021-09-06 22:16:24 +020025 "sync"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020026
27 log "github.com/sirupsen/logrus"
28 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
elinuxhenrik63a42ca2021-09-06 22:16:24 +020029 "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
elinuxhenrikba96d842021-09-06 16:05:01 +020030 "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020031)
32
33var configuration *config.Config
elinuxhenrikba96d842021-09-06 16:05:01 +020034var supervisionCallbackAddress string
35var jobInfoCallbackAddress string
elinuxhenrikcce95ff2021-09-05 17:27:02 +020036
37func init() {
38 configuration = config.New()
39 if loglevel, err := log.ParseLevel(configuration.LogLevel); err == nil {
40 log.SetLevel(loglevel)
41 } else {
42 log.Warnf("Invalid log level: %v. Log level will be Info!", configuration.LogLevel)
43 }
44
45 log.Debug("Initializing DMaaP Mediator Producer")
elinuxhenrikba96d842021-09-06 16:05:01 +020046 if configuration.InfoProducerSupervisionCallbackHost == "" {
47 log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST")
elinuxhenrika77cd652021-09-06 10:56:21 +020048 }
elinuxhenrikba96d842021-09-06 16:05:01 +020049 supervisionCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerSupervisionCallbackHost, configuration.InfoProducerSupervisionCallbackPort)
50
51 if configuration.InfoJobCallbackHost == "" {
52 log.Fatal("Missing INFO_JOB_CALLBACK_HOST")
elinuxhenrikcce95ff2021-09-05 17:27:02 +020053 }
elinuxhenrikba96d842021-09-06 16:05:01 +020054 jobInfoCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoJobCallbackHost, configuration.InfoJobCallbackPort)
elinuxhenrikcce95ff2021-09-05 17:27:02 +020055
56 registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
elinuxhenrik63a42ca2021-09-06 22:16:24 +020057 if types, err := jobs.GetTypes(); err == nil {
elinuxhenrikcce95ff2021-09-05 17:27:02 +020058 if regErr := registrator.RegisterTypes(types); regErr != nil {
59 log.Fatalf("Unable to register all types due to: %v", regErr)
60 }
61 } else {
62 log.Fatalf("Unable to get types to register due to: %v", err)
63 }
elinuxhenrika77cd652021-09-06 10:56:21 +020064 producer := config.ProducerRegistrationInfo{
elinuxhenrikba96d842021-09-06 16:05:01 +020065 InfoProducerSupervisionCallbackUrl: supervisionCallbackAddress,
elinuxhenrik63a42ca2021-09-06 22:16:24 +020066 SupportedInfoTypes: jobs.GetSupportedTypes(),
elinuxhenrikba96d842021-09-06 16:05:01 +020067 InfoJobCallbackUrl: jobInfoCallbackAddress,
elinuxhenrika77cd652021-09-06 10:56:21 +020068 }
69 if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
70 log.Fatalf("Unable to register producer due to: %v", err)
71 }
elinuxhenrikcce95ff2021-09-05 17:27:02 +020072}
73
74func main() {
75 log.Debug("Starting DMaaP Mediator Producer")
elinuxhenrik63a42ca2021-09-06 22:16:24 +020076 wg := new(sync.WaitGroup)
elinuxhenrikba96d842021-09-06 16:05:01 +020077
elinuxhenrik63a42ca2021-09-06 22:16:24 +020078 // add two goroutines to `wg` WaitGroup, one for each avilable server
elinuxhenrik28038562021-09-21 15:43:11 +020079 wg.Add(3)
elinuxhenrik63a42ca2021-09-06 22:16:24 +020080
81 log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
82 go func() {
83 server := server.CreateServer(configuration.InfoProducerSupervisionCallbackPort, server.StatusHandler)
84 log.Warn(server.ListenAndServe())
85 wg.Done()
86 }()
87
88 go func() {
89 server := server.CreateServer(configuration.InfoJobCallbackPort, server.CreateInfoJobHandler)
90 log.Warn(server.ListenAndServe())
91 wg.Done()
92 }()
93
elinuxhenrik28038562021-09-21 15:43:11 +020094 go func() {
95 jobs.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
96 wg.Done()
97 }()
98
elinuxhenrik63a42ca2021-09-06 22:16:24 +020099 // wait until WaitGroup is done
100 wg.Wait()
elinuxhenrikba96d842021-09-06 16:05:01 +0200101 log.Debug("Stopping DMaaP Mediator Producer")
elinuxhenrikcce95ff2021-09-05 17:27:02 +0200102}