blob: 8805bdd58de5bc99cb8fd72535b51d19168da5fa [file] [log] [blame]
# Copyright (c) 2019 AT&T Intellectual Property.
# Copyright (c) 2018-2019 Nokia.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This source code is part of the near-RT RIC (RAN Intelligent Controller)
# platform project (RICP).
#
"""The module provides Shared Data Layer (SDL) database backend interface."""
from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
from abc import ABC, abstractmethod
class DbBackendAbc(ABC):
"""An abstract Shared Data Layer (SDL) class providing database backend interface."""
@abstractmethod
def is_connected(self):
"""Test database backend connection."""
pass
@abstractmethod
def close(self):
"""Close database backend connection."""
pass
@abstractmethod
def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
"""Write key value data mapping to database under a namespace."""
pass
@abstractmethod
def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
""""Write key value to database under a namespace if the old value is expected one."""
pass
@abstractmethod
def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
""""Write key value to database under a namespace if key doesn't exist."""
pass
@abstractmethod
def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
""""Return values of the keys under a namespace."""
pass
@abstractmethod
def find_keys(self, ns: str, key_pattern: str) -> List[str]:
""""Return all the keys matching search pattern under a namespace in database."""
pass
@abstractmethod
def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
"""
Return all the keys with their values matching search pattern under a namespace in
database.
"""
pass
@abstractmethod
def remove(self, ns: str, keys: List[str]) -> None:
"""Remove keys and their data from database."""
pass
@abstractmethod
def remove_if(self, ns: str, key: str, data: bytes) -> bool:
"""
Remove key and its data from database if if the current data value is expected
one.
"""
pass
@abstractmethod
def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
"""Add new members to a group under a namespace in database."""
pass
@abstractmethod
def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
"""Remove members from a group under a namespace in database."""
pass
@abstractmethod
def remove_group(self, ns: str, group: str) -> None:
"""Remove a group under a namespace in database along with it's members."""
pass
@abstractmethod
def get_members(self, ns: str, group: str) -> Set[bytes]:
"""Get all the members of a group under a namespace in database."""
pass
@abstractmethod
def is_member(self, ns: str, group: str, member: bytes) -> bool:
"""Validate if a given member is in the group under a namespace in database."""
pass
@abstractmethod
def group_size(self, ns: str, group: str) -> int:
"""Return the number of members in a group under a namespace in database."""
pass
@abstractmethod
def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
data_map: Dict[str, bytes]) -> None:
"""Publish event to channel after writing data."""
pass
@abstractmethod
def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
old_data: bytes, new_data: bytes) -> bool:
"""
Publish event to channel after writing key value to database under a namespace
if the old value is expected one.
"""
pass
@abstractmethod
def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
key: str, data: bytes) -> bool:
""""
Publish event to channel after writing key value to database under a namespace if
key doesn't exist.
"""
pass
@abstractmethod
def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
keys: List[str]) -> None:
"""Publish event to channel after removing data."""
pass
@abstractmethod
def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
data: bytes) -> bool:
"""
Publish event to channel after removing key and its data from database if the
current data value is expected one.
"""
pass
@abstractmethod
def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
"""
Publish event to channel after removing all keys in namespace.
"""
pass
@abstractmethod
def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: List[str]) -> None:
"""
This takes a callback function and one or many channels to be subscribed.
When an event is received for the given channel, the given callback function
shall be called with channel and notification(s) as parameter.
"""
pass
@abstractmethod
def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
"""Unsubscribes from channel and removes set callback function."""
pass
@abstractmethod
def start_event_listener(self) -> None:
"""
start_event_listener creates an event loop in a separate thread for handling
notifications from subscriptions.
"""
pass
@abstractmethod
def handle_events(self) -> Optional[Tuple[str, List[str]]]:
"""
handle_events is a non-blocking function that returns a tuple containing channel
name and message(s) received from notification.
"""
pass
class DbBackendLockAbc(ABC):
"""
An abstract Shared Data Layer (SDL) class providing database backend lock interface.
Args:
ns (str): Namespace under which this lock is targeted.
name (str): Lock name, identifies the lock key in a database backend.
"""
def __init__(self, ns: str, name: str) -> None:
self._ns = ns
self._lock_name = name
super().__init__()
@abstractmethod
def acquire(self, retry_interval: Union[int, float] = 0.1,
retry_timeout: Union[int, float] = 10) -> bool:
"""Acquire a database lock."""
pass
@abstractmethod
def release(self) -> None:
"""Release a database lock."""
pass
@abstractmethod
def refresh(self) -> None:
"""Refresh the remaining validity time of the database lock back to a initial value."""
pass
@abstractmethod
def get_validity_time(self) -> Union[int, float]:
"""Return remaining validity time of the lock in seconds."""
pass