blob: 0bd5b248586937f1f5d36a23a85b22026bd777b5 [file] [log] [blame]
# Copyright (c) 2022 Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Subscription interface implements the subscription manager REST based interface defined in
# https://docs.o-ran-sc.org/projects/o-ran-sc-ric-plt-submgr/en/latest/user-guide.html
#
import ricxappframe.subsclient as subsclient
import ricxappframe.xapp_rest as ricrest
from mdclogpy import Logger
logging = Logger(name=__name__)
class NewSubscriber():
def __init__(self, uri, timeout=None, local_address="0.0.0.0", local_port=8088, rmr_port=4061):
"""
init
Parameters
----------
uri: string
xapp submgr service uri
timeout: int
rest method timeout
local_address: string
local interface IP address for rest service binding (for response handler)
local_port: int
local service port nunber for rest service binding (for response handler)
rmr_port: int
rmr port number
"""
self.uri = uri
self.timeout = timeout
self.local_address = local_address
self.local_port = local_port
self.rmr_port = rmr_port
self.url = "/ric/v1/subscriptions/response"
self.serverHandler = None
self.responseCB = None
# Configure API
configuration = subsclient.Configuration()
configuration.verify_ssl = False
configuration.host = "http://127.0.0.1:8088/"
self.api = subsclient.ApiClient(configuration)
def _responsePostHandler(self, name, path, data, ctype):
"""
_resppnsePostHandler
internally used subscription reponse handler it the callback function is not set
"""
return "{}", 'application/json', "OK", 200
# following methods are wrappers to hide the swagger client
def SubscriptionParamsClientEndpoint(self, host=None, http_port=None, rmr_port=None):
return subsclient.SubscriptionParamsClientEndpoint(host, http_port, rmr_port)
def SubscriptionParamsE2SubscriptionDirectives(self, e2_timeout_timer_value=None, e2_retry_count=None, rmr_routing_needed=None):
return subsclient.SubscriptionParamsE2SubscriptionDirectives(e2_timeout_timer_value, e2_retry_count, rmr_routing_needed)
def SubsequentAction(self, subsequent_action_type=None, time_to_wait=None):
return subsclient.SubsequentAction(subsequent_action_type, time_to_wait)
def ActionToBeSetup(self, action_id=None, action_type=None, action_definition=None, subsequent_action=None):
return subsclient.ActionToBeSetup(action_id, action_type, action_definition, subsequent_action)
def SubscriptionDetail(self, xapp_event_instance_id=None, event_triggers=None, action_to_be_setup_list=None):
return subsclient.SubscriptionDetail(xapp_event_instance_id, event_triggers, action_to_be_setup_list)
def SubscriptionParams(self, subscription_id=None, client_endpoint=None, meid=None, ran_function_id=None, e2_subscription_directives=None, subscription_details=None):
return subsclient.SubscriptionParams(subscription_id, client_endpoint, meid, ran_function_id, e2_subscription_directives, subscription_details)
def Subscribe(self, subs_params=None):
"""
Subscribe
subscription request
Parameters
----------
subs_params: SubscriptionParams
structured subscription data definition defined in subsclient
Returns
-------
SubscriptionResponse
json string of SubscriptionResponse object
"""
# if subs_params is not None and type(subs_params) is subsclient.models.subscription_params.SubscriptionParams:
if subs_params is not None:
response = self.api.request(method="POST", url=self.uri, headers=None, body=subs_params.to_dict())
return response.data, response.reason, response.status
return None, "Input parameter is not SubscriptionParams{}", 500
def UnSubscribe(self, subs_id=None):
"""
UnSubscribe
subscription remove
Parameters
----------
subs_id: int
subscription id returned in SubscriptionResponse
Returns
-------
response.reason: string
http reason
response.status: int
http status code
"""
response = self.api.request(method="DELETE", url=self.uri + "/subscriptions/" + subs_id, headers=None)
return response.data, response.reason, response.status
def QuerySubscriptions(self):
"""
QuerySubscriptions
Query all subscriptions
Returns
-------
response.data: json string
SubscriptionList
response.reason: string
http reason
response.status: int
http status code
"""
response = self.api.request(method="GET", url=self.uri + "/subscriptions", headers=None)
return response.data, response.reason, response.status
def ResponseHandler(self, responseCB=None, server=None):
"""
ResponseHandler
Starts the response handler and set the callback
Parameters
----------
responseCB
Set the callback handler, if not set the the default self._responsePostHandler is used
server: xapp_rest.ThreadedHTTPServer
if set then the existing xapp_rest.ThreadedHTTPServer handler is used, otherwise a new will be created
Returns
-------
status: boolean
True = success, False = failed
"""
# create the thread HTTP server
self.serverHandler = server
if self.serverHandler is None:
# make the serverhandler
self.serverHandler = ricrest.ThreadedHTTPServer(self.local_address, self.local_port)
self.serverHandler.start()
if self.serverHandler is not None:
if responseCB is not None:
self.responseCB = responseCB
# get http handler with object reference
self.serverHandler.handler.add_handler(self.serverHandler.handler, "POST", "response", self.url, responseCB)
return True
else:
return False