blob: 19851be5c8c183658f4e601c9aad99c33e224f05 [file] [log] [blame]
elinuxhenrik6b45b052022-01-12 16:12:45 +01001// -
2// ========================LICENSE_START=================================
3// O-RAN-SC
4// %%
5// Copyright (C) 2022: Nordix Foundation
6// %%
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License at
10//
11// http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing, software
14// distributed under the License is distributed on an "AS IS" BASIS,
15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16// See the License for the specific language governing permissions and
17// limitations under the License.
18// ========================LICENSE_END===================================
19//
20
21package main
22
23import (
24 "bytes"
elinuxhenrik2a6b1312022-01-19 16:51:36 +010025 "fmt"
elinuxhenrik6b45b052022-01-12 16:12:45 +010026 "io/ioutil"
27 "net/http"
28 "os/exec"
29 "sync"
30 "testing"
31 "time"
32
33 "github.com/stretchr/testify/require"
34 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
35 "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
36 "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
37)
38
39// This is not a real test, just a way to get the Swagger documentation generated automatically.
40// Hence there are no assertions in this test.
41func TestGenerateSwaggerDocs(t *testing.T) {
42 cmd := exec.Command("./generate_swagger_docs.sh")
43
elinuxhenrik2a6b1312022-01-19 16:51:36 +010044 err := cmd.Run()
45 if err != nil {
46 fmt.Println("Error generating Swagger:", err)
47 }
elinuxhenrik6b45b052022-01-12 16:12:45 +010048}
49
50func TestValidateConfiguration(t *testing.T) {
51 assertions := require.New(t)
52
53 validConfig := config.Config{
54 InfoProducerHost: "host",
55 DMaaPMRAddress: "address",
56 KafkaBootstrapServers: "servers",
57 ProducerCertPath: "path",
58 ProducerKeyPath: "path",
59 }
60 assertions.Nil(validateConfiguration(&validConfig))
61
62 missingProducerHost := config.Config{
63 DMaaPMRAddress: "address",
64 KafkaBootstrapServers: "servers",
65 ProducerCertPath: "path",
66 ProducerKeyPath: "path",
67 }
68 assertions.Contains(validateConfiguration(&missingProducerHost).Error(), "INFO_PRODUCER_HOST")
69
70 missingCert := config.Config{
71 InfoProducerHost: "host",
72 DMaaPMRAddress: "address",
73 KafkaBootstrapServers: "servers",
74 ProducerKeyPath: "path",
75 }
76 assertions.Contains(validateConfiguration(&missingCert).Error(), "PRODUCER_CERT")
77
78 missingCertKey := config.Config{
79 InfoProducerHost: "host",
80 DMaaPMRAddress: "address",
81 KafkaBootstrapServers: "servers",
82 ProducerCertPath: "path",
83 }
84 assertions.Contains(validateConfiguration(&missingCertKey).Error(), "PRODUCER_KEY")
85
86 missingMRAddress := config.Config{
87 InfoProducerHost: "host",
88 KafkaBootstrapServers: "servers",
89 ProducerCertPath: "path",
90 ProducerKeyPath: "path",
91 }
92 assertions.Nil(validateConfiguration(&missingMRAddress))
93
94 missingKafkaServers := config.Config{
95 InfoProducerHost: "host",
96 DMaaPMRAddress: "address",
97 ProducerCertPath: "path",
98 ProducerKeyPath: "path",
99 }
100 assertions.Nil(validateConfiguration(&missingKafkaServers))
101
102 missingMRAddressdAndKafkaServers := config.Config{
103 InfoProducerHost: "host",
104 ProducerCertPath: "path",
105 ProducerKeyPath: "path",
106 }
107 assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "DMAAP_MR_ADDR")
108 assertions.Contains(validateConfiguration(&missingMRAddressdAndKafkaServers).Error(), "KAFKA_BOOTSRAP_SERVERS")
109}
110
111func TestRegisterTypesAndProducer(t *testing.T) {
112 assertions := require.New(t)
113
114 wg := sync.WaitGroup{}
115 clientMock := NewTestClient(func(req *http.Request) *http.Response {
116 if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/STD_Fault_Messages" {
117 assertions.Equal(req.Method, "PUT")
118 body := getBodyAsString(req, t)
119 assertions.Contains(body, "info_job_data_schema")
120 assertions.Equal("application/json", req.Header.Get("Content-Type"))
121 wg.Done()
122 return &http.Response{
123 StatusCode: 200,
124 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
125 Header: make(http.Header), // Must be set to non-nil value or it panics
126 }
127 } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-types/Kafka_TestTopic" {
128 assertions.Equal(req.Method, "PUT")
129 body := getBodyAsString(req, t)
130 assertions.Contains(body, "info_job_data_schema")
131 assertions.Equal("application/json", req.Header.Get("Content-Type"))
132 wg.Done()
133 return &http.Response{
134 StatusCode: 200,
135 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
136 Header: make(http.Header), // Must be set to non-nil value or it panics
137 }
138 } else if req.URL.String() == configuration.InfoCoordinatorAddress+"/data-producer/v1/info-producers/DMaaP_Mediator_Producer" {
139 assertions.Equal(req.Method, "PUT")
140 body := getBodyAsString(req, t)
141 assertions.Contains(body, "callbackAddress/health_check")
142 assertions.Contains(body, "callbackAddress/info_job")
143 assertions.Contains(body, "Kafka_TestTopic")
144 assertions.Contains(body, "STD_Fault_Messages")
145 assertions.Equal("application/json", req.Header.Get("Content-Type"))
146 wg.Done()
147 return &http.Response{
148 StatusCode: 200,
149 Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
150 Header: make(http.Header), // Must be set to non-nil value or it panics
151 }
152 }
153 t.Error("Wrong call to client: ", req)
154 t.Fail()
155 return nil
156 })
157 jobsManager := jobs.NewJobsManagerImpl(clientMock, configuration.DMaaPMRAddress, kafkaclient.KafkaFactoryImpl{}, nil)
158
159 wg.Add(3)
160 err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, "callbackAddress", clientMock)
161
162 assertions.Nil(err)
163
164 if waitTimeout(&wg, 2*time.Second) {
165 t.Error("Not all calls to server were made")
166 t.Fail()
167 }
168}
169
170type RoundTripFunc func(req *http.Request) *http.Response
171
172func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
173 return f(req), nil
174}
175
176//NewTestClient returns *http.Client with Transport replaced to avoid making real calls
177func NewTestClient(fn RoundTripFunc) *http.Client {
178 return &http.Client{
179 Transport: RoundTripFunc(fn),
180 }
181}
182
183func getBodyAsString(req *http.Request, t *testing.T) string {
184 buf := new(bytes.Buffer)
185 if _, err := buf.ReadFrom(req.Body); err != nil {
186 t.Fail()
187 }
188 return buf.String()
189}
190
191// waitTimeout waits for the waitgroup for the specified max timeout.
192// Returns true if waiting timed out.
193func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
194 c := make(chan struct{})
195 go func() {
196 defer close(c)
197 wg.Wait()
198 }()
199 select {
200 case <-c:
201 return false // completed normally
202 case <-time.After(timeout):
203 return true // timed out
204 }
205}