blob: 74ac7e3bc8513ecf82b25b8ae4e2b7973e690e83 [file] [log] [blame]
from netconf_constant import *
from org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor import NetconfExecutorExtensionsKt
class NetconfClient:
def __init__(self, log, component_function, requirement_name):
self.log = log
self.component_function = component_function
netconf_device = NetconfExecutorExtensionsKt.netconfDevice(component_function, requirement_name)
self.netconf_rpc_client = netconf_device.netconfRpcService
self.netconf_session = netconf_device.netconfSession
def disconnect(self):
self.netconf_session.disconnect()
return
def connect(self):
self.netconf_session.connect()
return
def lock(self, config_target=CONFIG_TARGET_CANDIDATE):
device_response = self.netconf_rpc_client.lock(config_target)
return device_response
def get_config(self, filter="", config_target=CONFIG_TARGET_RUNNING):
device_response = self.netconf_rpc_client.getConfig(filter, config_target)
return device_response
def edit_config(self, message_content, config_target=CONFIG_TARGET_CANDIDATE,
edit_default_peration=CONFIG_DEFAULT_OPERATION_REPLACE):
device_response = self.netconf_rpc_client.editConfig(message_content,
config_target,
edit_default_peration)
return device_response
def commit(self, confirmed=False, confirm_timeout=60, persist="",
persist_id=""):
device_response = self.netconf_rpc_client.commit(confirmed, confirm_timeout,
persist, persist_id)
return device_response
def invoke_rpc(self, rpc):
device_response = self.netconf_rpc_client.invokeRpc(rpc)
return device_response
def cancel_commit(self, persist_id=""):
device_response = self.netconf_rpc_client.cancelCommit(persist_id)
return device_response
def unlock(self, config_target=CONFIG_TARGET_CANDIDATE):
device_response = self.netconf_rpc_client.unLock(config_target)
return device_response
def validate(self, config_target=CONFIG_TARGET_CANDIDATE):
device_response = self.netconf_rpc_client.validate(config_target)
return device_response
def discard_change(self):
device_response = self.netconf_rpc_client.discardConfig()
return device_response
def get(self, filter_content):
device_response = self.netconf_rpc_client.get(filter_content)
return device_response
def set_execution_attribute_response_data(self, response_data):
self.component_function.setAttribute(ATTRIBUTE_RESPONSE_DATA, response_data)