blob: 4dcf22b19081caf383785f351d58ae8681d8ccfc [file] [log] [blame]
Mohamed Abukar5120ec12020-02-04 11:01:24 +02001/*
2==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17==================================================================================
18*/
19
20package xapp
21
22import (
Mohamed Abukar5953f7e2020-04-02 10:08:14 +030023 "bytes"
Mohamed Abukar9568a2d2020-02-18 16:50:32 +020024 "encoding/json"
25 "fmt"
Mohamed Abukar5120ec12020-02-04 11:01:24 +020026 "github.com/go-openapi/loads"
27 httptransport "github.com/go-openapi/runtime/client"
28 "github.com/go-openapi/runtime/middleware"
29 "github.com/go-openapi/strfmt"
Mohamed Abukar5953f7e2020-04-02 10:08:14 +030030 "github.com/spf13/viper"
Mohamed Abukar9568a2d2020-02-18 16:50:32 +020031 "io/ioutil"
Mohamed Abukar47faec82020-04-09 22:08:19 +030032 "net"
Mohamed Abukar9568a2d2020-02-18 16:50:32 +020033 "net/http"
rangajalfd347862020-11-03 20:35:40 +030034 "os"
Mohamed Abukar5120ec12020-02-04 11:01:24 +020035 "time"
36
37 apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
Mohamed Abukar429da192020-02-26 16:46:34 +020038 apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
Mohamed Abukar5120ec12020-02-04 11:01:24 +020039 apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
40 apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
41 apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
42
43 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
44 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
45 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
Mohamed Abukar429da192020-02-26 16:46:34 +020046 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
Mohamed Abukar5120ec12020-02-04 11:01:24 +020047 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
Mohamed Abukar9568a2d2020-02-18 16:50:32 +020048 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
Mohamed Abukar5120ec12020-02-04 11:01:24 +020049 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
50)
51
Mohamed Abukarb6341a52020-03-23 08:55:05 +020052type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
Mohamed Abukar9568a2d2020-02-18 16:50:32 +020053type SubscriptionQueryHandler func() (models.SubscriptionList, error)
Mohamed Abukar429da192020-02-26 16:46:34 +020054type SubscriptionDeleteHandler func(string) error
Mohamed Abukar5953f7e2020-04-02 10:08:14 +030055type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
Mohamed Abukar5120ec12020-02-04 11:01:24 +020056
57type Subscriber struct {
58 localAddr string
59 localPort int
60 remoteHost string
61 remoteUrl string
62 remoteProt []string
63 timeout time.Duration
Mohamed Abukar5953f7e2020-04-02 10:08:14 +030064 clientUrl string
65 clientCB SubscriptionResponseCallback
Mohamed Abukar5120ec12020-02-04 11:01:24 +020066}
67
68func NewSubscriber(host string, timo int) *Subscriber {
69 if host == "" {
rangajalfd347862020-11-03 20:35:40 +030070 pltnamespace := os.Getenv("PLT_NAMESPACE")
71 if pltnamespace == "" {
72 pltnamespace = "ricplt"
73 }
74 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
Mohamed Abukar5120ec12020-02-04 11:01:24 +020075 }
76
77 if timo == 0 {
78 timo = 20
79 }
80
Mohamed Abukar5953f7e2020-04-02 10:08:14 +030081 r := &Subscriber{
Mohamed Abukar5120ec12020-02-04 11:01:24 +020082 remoteHost: host,
83 remoteUrl: "/ric/v1",
84 remoteProt: []string{"http"},
85 timeout: time.Duration(timo) * time.Second,
86 localAddr: "0.0.0.0",
87 localPort: 8088,
Mohamed Abukar5953f7e2020-04-02 10:08:14 +030088 clientUrl: "/ric/v1/subscriptions/response",
89 }
90 Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
91
92 return r
93}
94
95func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
96 if req.Body != nil {
97 var resp apimodel.SubscriptionResponse
98 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
99 if r.clientCB != nil {
100 r.clientCB(&resp)
101 }
102 }
103 req.Body.Close()
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200104 }
105}
106
107// Server interface: listen and receive subscription requests
Mohamed Abukar5953f7e2020-04-02 10:08:14 +0300108func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200109 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
110 if err != nil {
111 return err
112 }
113
114 api := operations.NewXappFrameworkAPI(swaggerSpec)
115
Mohamed Abukar5953f7e2020-04-02 10:08:14 +0300116 // Subscription: Query
Mohamed Abukar9568a2d2020-02-18 16:50:32 +0200117 api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
118 func(p query.GetAllSubscriptionsParams) middleware.Responder {
Mohamed Abukar5953f7e2020-04-02 10:08:14 +0300119 if resp, err := getSubscription(); err == nil {
Mohamed Abukar9568a2d2020-02-18 16:50:32 +0200120 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
121 }
122 return query.NewGetAllSubscriptionsInternalServerError()
123 })
124
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200125 // SubscriptionType: Report
126 api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
127 func(p report.SubscribeReportParams) middleware.Responder {
Mohamed Abukar5953f7e2020-04-02 10:08:14 +0300128 if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200129 return report.NewSubscribeReportCreated().WithPayload(resp)
130 }
131 return report.NewSubscribeReportInternalServerError()
132 })
133
Mohamed Abukar5953f7e2020-04-02 10:08:14 +0300134 // SubscriptionType: Policy
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200135 api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
136 func(p policy.SubscribePolicyParams) middleware.Responder {
Mohamed Abukar5953f7e2020-04-02 10:08:14 +0300137 if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200138 return policy.NewSubscribePolicyCreated().WithPayload(resp)
139 }
140 return policy.NewSubscribePolicyInternalServerError()
141 })
142
Mohamed Abukar5953f7e2020-04-02 10:08:14 +0300143 // SubscriptionType: Delete
Mohamed Abukar429da192020-02-26 16:46:34 +0200144 api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
145 func(p common.UnsubscribeParams) middleware.Responder {
Mohamed Abukar5953f7e2020-04-02 10:08:14 +0300146 if err := delSubscription(p.SubscriptionID); err == nil {
Mohamed Abukar429da192020-02-26 16:46:34 +0200147 return common.NewUnsubscribeNoContent()
148 }
149 return common.NewUnsubscribeInternalServerError()
150 })
151
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200152 server := restapi.NewServer(api)
153 defer server.Shutdown()
154 server.Host = r.localAddr
155 server.Port = r.localPort
156
157 Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
158 if err := server.Serve(); err != nil {
159 return err
160 }
161 return nil
162}
163
Mohamed Abukar5953f7e2020-04-02 10:08:14 +0300164// Server interface: send notification to client
165func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
166 respData, err := json.Marshal(resp)
167 if err != nil {
168 Logger.Error("json.Marshal failed: %v", err)
169 return err
170 }
171
Mohamed Abukar47faec82020-04-09 22:08:19 +0300172 ep, _, _ := net.SplitHostPort(clientEndpoint)
173 _, port, _ := net.SplitHostPort(viper.GetString("local.host"))
174 clientUrl := fmt.Sprintf("http://%s:%s%s", ep, port, r.clientUrl)
Mohamed Abukar5953f7e2020-04-02 10:08:14 +0300175
176 retries := viper.GetInt("subscription.retryCount")
177 if retries == 0 {
178 retries = 10
179 }
180
181 delay := viper.GetInt("subscription.retryDelay")
182 if delay == 0 {
183 delay = 5
184 }
185
186 for i := 0; i < retries; i++ {
187 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
188 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
189 break
190 }
191
192 if err != nil {
193 Logger.Error("%v", err)
194 }
195 if r != nil && r.StatusCode != http.StatusOK {
196 Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
197 }
198 time.Sleep(time.Duration(delay) * time.Second)
199 }
200
201 return err
202}
203
204// Subscription interface for xApp: Response callback
205func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
206 r.clientCB = c
207}
208
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200209// Subscription interface for xApp: REPORT
Mohamed Abukarb6341a52020-03-23 08:55:05 +0200210func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200211 params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
212 result, err := r.CreateTransport().Report.SubscribeReport(params)
213 if err != nil {
Mohamed Abukarb6341a52020-03-23 08:55:05 +0200214 return &apimodel.SubscriptionResponse{}, err
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200215 }
216
217 return result.Payload, err
218}
219
220// Subscription interface for xApp: POLICY
Mohamed Abukarb6341a52020-03-23 08:55:05 +0200221func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (*apimodel.SubscriptionResponse, error) {
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200222 params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
223 result, err := r.CreateTransport().Policy.SubscribePolicy(params)
224 if err != nil {
Mohamed Abukarb6341a52020-03-23 08:55:05 +0200225 return &apimodel.SubscriptionResponse{}, err
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200226 }
227
228 return result.Payload, err
229}
230
Mohamed Abukar429da192020-02-26 16:46:34 +0200231// Subscription interface for xApp: DELETE
232func (r *Subscriber) UnSubscribe(subId string) error {
233 params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
234 _, err := r.CreateTransport().Common.Unsubscribe(params)
235
236 return err
237}
238
Mohamed Abukar9568a2d2020-02-18 16:50:32 +0200239// Subscription interface for xApp: QUERY
240func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
241 resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
242 if err != nil {
243 return models.SubscriptionList{}, err
244 }
245
246 defer resp.Body.Close()
247
248 contents, err := ioutil.ReadAll(resp.Body)
249 if err != nil {
250 return models.SubscriptionList{}, err
251 }
252
253 subscriptions := models.SubscriptionList{}
254 err = json.Unmarshal([]byte(string(contents)), &subscriptions)
255 if err != nil {
256 return models.SubscriptionList{}, err
257 }
258
259 return subscriptions, nil
260}
261
262func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
263 return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
Mohamed Abukar5120ec12020-02-04 11:01:24 +0200264}