blob: 1dd0ad1c605f1167700f294b8a20f06443369d5c [file] [log] [blame]
elinuxhenrikcce95ff2021-09-05 17:27:02 +02001// -
2// ========================LICENSE_START=================================
3// O-RAN-SC
4// %%
5// Copyright (C) 2021: Nordix Foundation
6// %%
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License at
10//
11// http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing, software
14// distributed under the License is distributed on an "AS IS" BASIS,
15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16// See the License for the specific language governing permissions and
17// limitations under the License.
18// ========================LICENSE_END===================================
19//
20
21package config
22
23import (
elinuxhenrika77cd652021-09-06 10:56:21 +020024 "encoding/json"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020025 "fmt"
26 "net/url"
27
28 log "github.com/sirupsen/logrus"
29
elinuxhenrikcce95ff2021-09-05 17:27:02 +020030 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
31)
32
33const registerTypePath = "/data-producer/v1/info-types/"
elinuxhenrika77cd652021-09-06 10:56:21 +020034const registerProducerPath = "/data-producer/v1/info-producers/"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020035
elinuxhenrik65a53d22021-09-29 15:41:26 +020036type TypeDefinition struct {
elinuxhenrik6f5d3d12021-12-23 16:36:31 +010037 Identity string `json:"id"`
38 DMaaPTopicURL string `json:"dmaapTopicUrl"`
39 KafkaInputTopic string `json:"kafkaInputTopic"`
40 TypeSchema interface{}
41}
42
43func (td TypeDefinition) IsKafkaType() bool {
44 return td.KafkaInputTopic != ""
45}
46
47func (td TypeDefinition) IsDMaaPType() bool {
48 return td.DMaaPTopicURL != ""
elinuxhenrik65a53d22021-09-29 15:41:26 +020049}
50
51type ProducerRegistrationInfo struct {
52 InfoProducerSupervisionCallbackUrl string `json:"info_producer_supervision_callback_url"`
53 SupportedInfoTypes []string `json:"supported_info_types"`
54 InfoJobCallbackUrl string `json:"info_job_callback_url"`
55}
56
elinuxhenrikcce95ff2021-09-05 17:27:02 +020057type Registrator interface {
elinuxhenrik65a53d22021-09-29 15:41:26 +020058 RegisterTypes(types []TypeDefinition) error
elinuxhenrika77cd652021-09-06 10:56:21 +020059 RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo)
elinuxhenrikcce95ff2021-09-05 17:27:02 +020060}
61
62type RegistratorImpl struct {
63 infoCoordinatorAddress string
elinuxhenrik65a53d22021-09-29 15:41:26 +020064 httpClient restclient.HTTPClient
elinuxhenrikcce95ff2021-09-05 17:27:02 +020065}
66
elinuxhenrik65a53d22021-09-29 15:41:26 +020067func NewRegistratorImpl(infoCoordAddr string, client restclient.HTTPClient) *RegistratorImpl {
elinuxhenrikcce95ff2021-09-05 17:27:02 +020068 return &RegistratorImpl{
69 infoCoordinatorAddress: infoCoordAddr,
elinuxhenrik65a53d22021-09-29 15:41:26 +020070 httpClient: client,
elinuxhenrikcce95ff2021-09-05 17:27:02 +020071 }
72}
73
elinuxhenrik65a53d22021-09-29 15:41:26 +020074func (r RegistratorImpl) RegisterTypes(jobTypes []TypeDefinition) error {
elinuxhenrikcce95ff2021-09-05 17:27:02 +020075 for _, jobType := range jobTypes {
elinuxhenrik3287cf62022-01-10 14:15:31 +010076 s, _ := json.Marshal(jobType.TypeSchema)
77 body := fmt.Sprintf(`{"info_job_data_schema": %v}`, string(s))
elinuxhenrik6f5d3d12021-12-23 16:36:31 +010078 if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Identity), []byte(body), r.httpClient); error != nil {
elinuxhenrikcce95ff2021-09-05 17:27:02 +020079 return error
80 }
81 log.Debugf("Registered type: %v", jobType)
82 }
83 return nil
84}
elinuxhenrika77cd652021-09-06 10:56:21 +020085
86func (r RegistratorImpl) RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) error {
87 if body, marshalErr := json.Marshal(producerInfo); marshalErr == nil {
elinuxhenrik65a53d22021-09-29 15:41:26 +020088 if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body), r.httpClient); putErr != nil {
elinuxhenrika77cd652021-09-06 10:56:21 +020089 return putErr
90 }
91 log.Debugf("Registered producer: %v", producerId)
92 return nil
93 } else {
94 return marshalErr
95 }
96}