blob: ec6419e3794bfc149c199bfe50424a4c9a1fbbe8 [file] [log] [blame]
kalnagy45114752019-06-18 14:40:39 +02001/*
2==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17==================================================================================
18*/
19
20package control
21
22/*
23#include <rmr/RIC_message_types.h>
kalnagy93cc3e22019-09-19 11:29:29 +020024#include <rmr/rmr.h>
kalnagy45114752019-06-18 14:40:39 +020025
26#cgo CFLAGS: -I../
27#cgo LDFLAGS: -lrmr_nng -lnng
28*/
29import "C"
30
kalnagy45114752019-06-18 14:40:39 +020031import (
Peter Szilagyifbc56f92019-07-23 19:29:46 +000032 "errors"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 "github.com/spf13/viper"
kalnagy93cc3e22019-09-19 11:29:29 +020035 "github.com/go-openapi/strfmt"
36 httptransport "github.com/go-openapi/runtime/client"
37 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
38 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
Peter Szilagyifbc56f92019-07-23 19:29:46 +000039 "math/rand"
40 "strconv"
41 "time"
kalnagy45114752019-06-18 14:40:39 +020042)
43
44type Control struct {
kalnagy93cc3e22019-09-19 11:29:29 +020045 e2ap *E2ap
46 registry *Registry
47 rtmgrClient *RtmgrClient
48 tracker *Tracker
49}
50
51type RMRMeid struct {
52 PlmnID string
53 EnbID string
54}
55
Peter Szilagyifbc56f92019-07-23 19:29:46 +000056var SEEDSN uint16
kalnagy93cc3e22019-09-19 11:29:29 +020057var SubscriptionReqChan = make(chan subRouteInfo, 10)
58
59const (
60 CREATE Action = 0
61 MERGE Action = 1
62 DELETE Action = 3
63)
Peter Szilagyifbc56f92019-07-23 19:29:46 +000064
65func init() {
66 viper.AutomaticEnv()
67 viper.SetEnvPrefix("submgr")
68 viper.AllowEmptyEnv(true)
69 SEEDSN = uint16(viper.GetInt("seed_sn"))
70 if SEEDSN == 0 {
71 rand.Seed(time.Now().UnixNano())
72 SEEDSN = uint16(rand.Intn(65535))
73 }
74 if SEEDSN > 65535 {
75 SEEDSN = 0
76 }
77 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", SEEDSN)
kalnagy45114752019-06-18 14:40:39 +020078}
79
80func NewControl() Control {
Peter Szilagyifbc56f92019-07-23 19:29:46 +000081 registry := new(Registry)
82 registry.Initialize(SEEDSN)
kalnagy93cc3e22019-09-19 11:29:29 +020083
Balint Uvegese9608cd2019-09-20 18:00:32 +000084 tracker := new(Tracker)
85 tracker.Init()
86
kalnagy93cc3e22019-09-19 11:29:29 +020087 transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88 client := rtmgrclient.New(transport, strfmt.Default)
89 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
kalnagye0018682019-09-26 16:28:25 +020090 delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
91 rtmgrClient := RtmgrClient{client, handle, delete_handle}
kalnagy93cc3e22019-09-19 11:29:29 +020092
Balint Uvegese9608cd2019-09-20 18:00:32 +000093 return Control{new(E2ap), registry, &rtmgrClient, tracker}
kalnagy45114752019-06-18 14:40:39 +020094}
95
96func (c *Control) Run() {
Peter Szilagyifbc56f92019-07-23 19:29:46 +000097 xapp.Run(c)
kalnagy45114752019-06-18 14:40:39 +020098}
99
kalnagy93cc3e22019-09-19 11:29:29 +0200100func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
101 switch rp.Mtype {
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000102 case C.RIC_SUB_REQ:
kalnagy93cc3e22019-09-19 11:29:29 +0200103 err = c.handleSubscriptionRequest(rp)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000104 case C.RIC_SUB_RESP:
kalnagy93cc3e22019-09-19 11:29:29 +0200105 err = c.handleSubscriptionResponse(rp)
106 case C.RIC_SUB_DEL_REQ:
107 err = c.handleSubscriptionDeleteRequest(rp)
kalnagye0018682019-09-26 16:28:25 +0200108 case C.RIC_SUB_DEL_RESP:
109 err = c.handleSubscriptionDeleteResponse(rp)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000110 default:
kalnagy93cc3e22019-09-19 11:29:29 +0200111 err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000112 }
113 return
kalnagy45114752019-06-18 14:40:39 +0200114}
115
kalnagy93cc3e22019-09-19 11:29:29 +0200116func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
117 if !xapp.Rmr.Send(params, false) {
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000118 err = errors.New("rmr.Send() failed")
119 }
120 return
kalnagy45114752019-06-18 14:40:39 +0200121}
122
kalnagye0018682019-09-26 16:28:25 +0200123func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
124 if !xapp.Rmr.Send(params, true) {
125 err = errors.New("rmr.Send() failed")
126 }
127 return
128}
129
kalnagy93cc3e22019-09-19 11:29:29 +0200130func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
131 payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000132 if err != nil {
133 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
134 return
135 }
kalnagy93cc3e22019-09-19 11:29:29 +0200136 xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
137
138 /* Reserve a sequence number and set it in the payload */
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000139 new_sub_id := c.registry.ReserveSequenceNumber()
kalnagy93cc3e22019-09-19 11:29:29 +0200140
141 _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000142 if err != nil {
143 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
144 return
145 }
kalnagy93cc3e22019-09-19 11:29:29 +0200146
147 src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
148 if err != nil {
149 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
150 return
151 }
152
153 /* Create transatcion records for every subscription request */
154 xact_key := Transaction_key{new_sub_id, CREATE}
kalnagye0018682019-09-26 16:28:25 +0200155 xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
Balint Uvegese9608cd2019-09-20 18:00:32 +0000156 err = c.tracker.Track_transaction(xact_key, xact_value)
kalnagy93cc3e22019-09-19 11:29:29 +0200157 if err != nil {
158 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
159 return
160 }
161
162 /* Update routing manager about the new subscription*/
163 sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
164 go c.rtmgrClient.SubscriptionRequestUpdate()
165 SubscriptionReqChan <- sub_route_action
166
167 // Setting new subscription ID in the RMR header
168 params.SubId = int(new_sub_id)
169
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000170 xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
kalnagy93cc3e22019-09-19 11:29:29 +0200171 c.rmrSend(params)
kalnagye0018682019-09-26 16:28:25 +0200172 xapp.Logger.Info("--- Debugging transaction table = %v", c.tracker.transaction_table)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000173 return
kalnagy45114752019-06-18 14:40:39 +0200174}
175
kalnagy93cc3e22019-09-19 11:29:29 +0200176func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
177 payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000178 if err != nil {
179 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
180 return
181 }
kalnagy93cc3e22019-09-19 11:29:29 +0200182 xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000183 if !c.registry.IsValidSequenceNumber(payload_seq_num) {
184 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.")
185 return
186 }
187 c.registry.setSubscriptionToConfirmed(payload_seq_num)
188 xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
kalnagye0018682019-09-26 16:28:25 +0200189 transaction, err := c.tracker.complete_transaction(payload_seq_num, DELETE)
190 if err != nil {
191 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
192 return
193 }
194 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
195 params.Mbuf = transaction.Mbuf
196 c.rmrReplyToSender(params)
kalnagy93cc3e22019-09-19 11:29:29 +0200197 return
198}
199
200func (act Action) String() string {
201 actions := [...]string{
202 "CREATE",
203 "MERGE",
204 "DELETE",
205 }
206
207 if act < CREATE || act > DELETE {
208 return "Unknown"
209 }
210 return actions[act]
211}
212
213func (act Action) valid() bool {
214 switch act {
215 case CREATE, MERGE, DELETE:
216 return true
217 default:
218 return false
219 }
220}
221
222func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
223 payload_seq_num, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
224 if err != nil {
225 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
226 return
227 }
228 xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
229 if c.registry.IsValidSequenceNumber(payload_seq_num) {
230 c.registry.deleteSubscription(payload_seq_num)
kalnagye0018682019-09-26 16:28:25 +0200231 trackErr := c.trackDeleteTransaction(params, payload_seq_num)
232 if trackErr != nil {
233 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
234 return trackErr
235 }
kalnagy93cc3e22019-09-19 11:29:29 +0200236 }
237 xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
238 c.rmrSend(params)
Peter Szilagyifbc56f92019-07-23 19:29:46 +0000239 return
240}
kalnagye0018682019-09-26 16:28:25 +0200241
242func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payload_seq_num uint16) (err error) {
243 src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
244 xact_key := Transaction_key{payload_seq_num, DELETE}
245 xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
246 err = c.tracker.Track_transaction(xact_key, xact_value)
247 return
248}
249
250func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
251 payload_seq_num, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
252 if err != nil {
253 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
254 return
255 }
256 var transaction , _= c.tracker.Retrive_transaction(payload_seq_num, DELETE)
257 sub_route_action := subRouteInfo{DELETE, transaction.Xapp_instance_address, transaction.Xapp_port, payload_seq_num }
258 go c.rtmgrClient.SubscriptionRequestUpdate()
259 SubscriptionReqChan <- sub_route_action
260
261 xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
262 if c.registry.releaseSequenceNumber(payload_seq_num) {
263 transaction, err = c.tracker.complete_transaction(payload_seq_num, DELETE)
264 if err != nil {
265 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
266 return
267 }
268 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
269 //params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
270 params.Mbuf = transaction.Mbuf
271 c.rmrReplyToSender(params)
272 }
273 return
274}