blob: 291075d81751fed487efd541a5bc787439c19715 [file] [log] [blame]
kalnagy45114752019-06-18 14:40:39 +02001/*
2==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17==================================================================================
18*/
19
20package control
21
22/*
23#include <rmr/RIC_message_types.h>
kalnagy93cc3e22019-09-19 11:29:29 +020024#include <rmr/rmr.h>
kalnagy45114752019-06-18 14:40:39 +020025
26#cgo CFLAGS: -I../
27#cgo LDFLAGS: -lrmr_nng -lnng
28*/
29import "C"
30
kalnagy45114752019-06-18 14:40:39 +020031import (
Peter Szilagyifbc56f92019-07-23 19:29:46 +000032 "errors"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 "github.com/spf13/viper"
kalnagy93cc3e22019-09-19 11:29:29 +020035 "github.com/go-openapi/strfmt"
36 httptransport "github.com/go-openapi/runtime/client"
37 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
38 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
Peter Szilagyifbc56f92019-07-23 19:29:46 +000039 "math/rand"
40 "strconv"
41 "time"
kalnagy45114752019-06-18 14:40:39 +020042)
43
44type Control struct {
kalnagy93cc3e22019-09-19 11:29:29 +020045 e2ap *E2ap
46 registry *Registry
47 rtmgrClient *RtmgrClient
48 tracker *Tracker
Balint Uvegescd3881b2019-10-02 15:01:43 +000049 rc_chan chan *xapp.RMRParams
kalnagy93cc3e22019-09-19 11:29:29 +020050}
51
52type RMRMeid struct {
53 PlmnID string
54 EnbID string
55}
56
Peter Szilagyifbc56f92019-07-23 19:29:46 +000057var SEEDSN uint16
kalnagy93cc3e22019-09-19 11:29:29 +020058var SubscriptionReqChan = make(chan subRouteInfo, 10)
59
60const (
61 CREATE Action = 0
62 MERGE Action = 1
63 DELETE Action = 3
64)
Peter Szilagyifbc56f92019-07-23 19:29:46 +000065
66func init() {
67 viper.AutomaticEnv()
68 viper.SetEnvPrefix("submgr")
69 viper.AllowEmptyEnv(true)
70 SEEDSN = uint16(viper.GetInt("seed_sn"))
71 if SEEDSN == 0 {
72 rand.Seed(time.Now().UnixNano())
73 SEEDSN = uint16(rand.Intn(65535))
74 }
75 if SEEDSN > 65535 {
76 SEEDSN = 0
77 }
78 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", SEEDSN)
kalnagy45114752019-06-18 14:40:39 +020079}
80
81func NewControl() Control {
Peter Szilagyifbc56f92019-07-23 19:29:46 +000082 registry := new(Registry)
83 registry.Initialize(SEEDSN)
kalnagy93cc3e22019-09-19 11:29:29 +020084
Balint Uvegese9608cd2019-09-20 18:00:32 +000085 tracker := new(Tracker)
86 tracker.Init()
87
kalnagy93cc3e22019-09-19 11:29:29 +020088 transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
89 client := rtmgrclient.New(transport, strfmt.Default)
90 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
kalnagye0018682019-09-26 16:28:25 +020091 delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
92 rtmgrClient := RtmgrClient{client, handle, delete_handle}
kalnagy93cc3e22019-09-19 11:29:29 +020093
Balint Uvegescd3881b2019-10-02 15:01:43 +000094 return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams)}
kalnagy45114752019-06-18 14:40:39 +020095}
96
97func (c *Control) Run() {
Balint Uvegescd3881b2019-10-02 15:01:43 +000098 go c.controlLoop()
Peter Szilagyifbc56f92019-07-23 19:29:46 +000099 xapp.Run(c)
kalnagy45114752019-06-18 14:40:39 +0200100}
101
kalnagy93cc3e22019-09-19 11:29:29 +0200102func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
Balint Uvegescd3881b2019-10-02 15:01:43 +0000103 c.rc_chan <- rp
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000104 return
kalnagy45114752019-06-18 14:40:39 +0200105}
106
kalnagy93cc3e22019-09-19 11:29:29 +0200107func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
108 if !xapp.Rmr.Send(params, false) {
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000109 err = errors.New("rmr.Send() failed")
110 }
111 return
kalnagy45114752019-06-18 14:40:39 +0200112}
113
kalnagye0018682019-09-26 16:28:25 +0200114func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
115 if !xapp.Rmr.Send(params, true) {
116 err = errors.New("rmr.Send() failed")
117 }
118 return
119}
120
Balint Uvegescd3881b2019-10-02 15:01:43 +0000121func (c *Control) controlLoop() {
122 for {
123 msg := <-c.rc_chan
124 switch msg.Mtype {
125 case C.RIC_SUB_REQ:
126 c.handleSubscriptionRequest(msg)
127 case C.RIC_SUB_RESP:
128 c.handleSubscriptionResponse(msg)
129 case C.RIC_SUB_DEL_REQ:
130 c.handleSubscriptionDeleteRequest(msg)
131 case C.RIC_SUB_DEL_RESP:
132 c.handleSubscriptionDeleteResponse(msg)
133 default:
134 err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
135 xapp.Logger.Error("Unknown message type: %v", err)
136 }
137 }
138}
139
kalnagy93cc3e22019-09-19 11:29:29 +0200140func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
141 payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000142 if err != nil {
143 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
144 return
145 }
kalnagy93cc3e22019-09-19 11:29:29 +0200146 xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
147
148 /* Reserve a sequence number and set it in the payload */
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000149 new_sub_id := c.registry.ReserveSequenceNumber()
kalnagy93cc3e22019-09-19 11:29:29 +0200150
151 _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000152 if err != nil {
153 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
154 return
155 }
kalnagy93cc3e22019-09-19 11:29:29 +0200156
157 src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
158 if err != nil {
159 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
160 return
161 }
162
163 /* Create transatcion records for every subscription request */
164 xact_key := Transaction_key{new_sub_id, CREATE}
kalnagye0018682019-09-26 16:28:25 +0200165 xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
Balint Uvegese9608cd2019-09-20 18:00:32 +0000166 err = c.tracker.Track_transaction(xact_key, xact_value)
kalnagy93cc3e22019-09-19 11:29:29 +0200167 if err != nil {
168 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
169 return
170 }
171
172 /* Update routing manager about the new subscription*/
173 sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
174 go c.rtmgrClient.SubscriptionRequestUpdate()
175 SubscriptionReqChan <- sub_route_action
176
177 // Setting new subscription ID in the RMR header
178 params.SubId = int(new_sub_id)
179
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000180 xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
kalnagy93cc3e22019-09-19 11:29:29 +0200181 c.rmrSend(params)
kalnagye0018682019-09-26 16:28:25 +0200182 xapp.Logger.Info("--- Debugging transaction table = %v", c.tracker.transaction_table)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000183 return
kalnagy45114752019-06-18 14:40:39 +0200184}
185
kalnagy93cc3e22019-09-19 11:29:29 +0200186func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
187 payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000188 if err != nil {
189 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
190 return
191 }
kalnagy93cc3e22019-09-19 11:29:29 +0200192 xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000193 if !c.registry.IsValidSequenceNumber(payload_seq_num) {
194 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.")
195 return
196 }
197 c.registry.setSubscriptionToConfirmed(payload_seq_num)
198 xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
kalnagye0018682019-09-26 16:28:25 +0200199 transaction, err := c.tracker.complete_transaction(payload_seq_num, DELETE)
200 if err != nil {
201 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
202 return
203 }
204 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
205 params.Mbuf = transaction.Mbuf
206 c.rmrReplyToSender(params)
kalnagy93cc3e22019-09-19 11:29:29 +0200207 return
208}
209
210func (act Action) String() string {
211 actions := [...]string{
212 "CREATE",
213 "MERGE",
214 "DELETE",
215 }
216
217 if act < CREATE || act > DELETE {
218 return "Unknown"
219 }
220 return actions[act]
221}
222
223func (act Action) valid() bool {
224 switch act {
225 case CREATE, MERGE, DELETE:
226 return true
227 default:
228 return false
229 }
230}
231
232func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
233 payload_seq_num, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
234 if err != nil {
235 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
236 return
237 }
238 xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
239 if c.registry.IsValidSequenceNumber(payload_seq_num) {
240 c.registry.deleteSubscription(payload_seq_num)
kalnagye0018682019-09-26 16:28:25 +0200241 trackErr := c.trackDeleteTransaction(params, payload_seq_num)
242 if trackErr != nil {
243 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
244 return trackErr
245 }
kalnagy93cc3e22019-09-19 11:29:29 +0200246 }
247 xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
248 c.rmrSend(params)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000249 return
250}
kalnagye0018682019-09-26 16:28:25 +0200251
252func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payload_seq_num uint16) (err error) {
253 src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
254 xact_key := Transaction_key{payload_seq_num, DELETE}
255 xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
256 err = c.tracker.Track_transaction(xact_key, xact_value)
257 return
258}
259
260func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
261 payload_seq_num, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
262 if err != nil {
263 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
264 return
265 }
266 var transaction , _= c.tracker.Retrive_transaction(payload_seq_num, DELETE)
267 sub_route_action := subRouteInfo{DELETE, transaction.Xapp_instance_address, transaction.Xapp_port, payload_seq_num }
268 go c.rtmgrClient.SubscriptionRequestUpdate()
269 SubscriptionReqChan <- sub_route_action
270
271 xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
272 if c.registry.releaseSequenceNumber(payload_seq_num) {
273 transaction, err = c.tracker.complete_transaction(payload_seq_num, DELETE)
274 if err != nil {
275 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
276 return
277 }
278 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
279 //params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
280 params.Mbuf = transaction.Mbuf
281 c.rmrReplyToSender(params)
282 }
283 return
284}