blob: 37967e1d4628ff629a96642094a3520d950aa561 [file] [log] [blame]
#!/usr/bin/env python
################################################################################
# Copyright 2021 highstreet technologies GmbH
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# importing the sys, json, requests library
from sqlite3 import TimeFromTicks
from jproperties import Properties
import os
import sys
import json
import time
import re
import requests
import getpass
import warnings
from typing import List
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
# global configurations
def get_env(name):
configs = Properties()
envFile = os.path.dirname(os.path.abspath(__file__)) + '/' + '../' + '.env'
with open(envFile, "rb") as read_prop:
configs.load(read_prop)
value = configs.get(name).data
regex = r"\$\{([^\}]+)\}"
matches = re.finditer(regex, value)
while True:
match = next(matches, None)
if match is None:
break
inner = get_env(match.group(1))
value = value.replace("${" + match.group(1) + "}", inner)
return value
def loadArgs(args: List[str]) -> tuple:
realmFile = os.path.dirname(os.path.abspath(__file__)) + '/o-ran-sc-realm.json'
authFile = os.path.dirname(os.path.abspath(__file__)) + '/authentication.json'
readyTimeout = 180
args.pop(0)
while len(args) > 0:
arg = args.pop(0)
if arg == '--auth' and len(args) > 0:
authFile = args.pop(0)
print('overwriting auth file: {}'.format(authFile))
elif arg == '--realm' and len(args) > 0:
realmFile = args.pop(0)
print('overwriting realm file: {}'.format(realmFile))
elif arg == '--timeout' and len(args) > 0:
readyTimeout = int(args.pop(0))
print('waiting for ready {} seconds'.format(readyTimeout))
return (realmFile, authFile, readyTimeout)
def isReady(timeoutSeconds=180):
url = getBaseUrl()
print(url)
response = None
print("waiting for ready state", end='')
while timeoutSeconds > 0:
try:
response = requests.get(url, verify=False, headers={})
print(response)
except:
pass
if response is not None and response.status_code == 200:
print('succeeded')
return True
time.sleep(1)
timeoutSeconds -= 1
print('.', end='', flush=True)
return False
def getBaseUrl():
try:
if get_env("USE_LOCAL_HOST_FOR_IDENTITY_CONFIG").strip("'\"") == "true":
return get_env("IDENTITY_PROVIDER_URL_LOCAL_HOST")
except AttributeError:
print("Using IDENTITY_PROVIDER_URL")
return get_env("IDENTITY_PROVIDER_URL")
# Request a token for futher communication
def getToken():
url = base + '/realms/master/protocol/openid-connect/token'
headers = {
'content-type': 'application/x-www-form-urlencoded',
'accept': 'application/json'
}
body = {
'client_id': 'admin-cli',
'grant_type': 'password',
'username': username,
'password': password
}
try:
response = requests.post(url, verify=False, auth=(username, password), data=body, headers=headers)
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
if response.status_code >= 200 and response.status_code < 300:
print('Got token!')
return response.json()['access_token']
else:
sys.exit('Getting token failed.')
# create the default realm from file
def createRealm(token, realm):
url = base + '/admin/realms'
auth = 'bearer ' + token
headers = {
'content-type': 'application/json',
'accept': 'application/json',
'authorization': auth
}
try:
response = requests.post(url, verify=False, json=realm, headers=headers)
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
return response.status_code >= 200 and response.status_code < 300
# Check if default realm exists
def checkRealmExists(token, realmId):
url = base + '/admin/realms/' + realmId
auth = 'bearer ' + token
headers = {
'accept': 'application/json',
'authorization': auth
}
try:
response = requests.get(url, verify=False, headers=headers)
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
if response.status_code >= 200 and response.status_code < 300:
return realmId == response.json()['id']
else:
# sys.exit('Getting realm failed.')
return False
# create a user in default realm
def createUser(token, realmConfig, user):
realmId = realmConfig['id']
url = base + '/admin/realms/' + realmId + '/users'
auth = 'bearer ' + token
headers = {
'accept': 'application/json',
'authorization': auth
}
try:
response = requests.post(url, verify=False, json=user, headers=headers)
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
if response.status_code >= 200 and response.status_code < 300:
print('User', user['username'], 'created!')
else:
print('User creation', user['username'], 'failed!\n', response.text)
# creates User accounts in realm based a file
def createUsers(token, realmConfig, authConfig):
for user in authConfig['users']:
createUser(token, realmConfig, user)
# create a user based on system user
systemUser = {
"firstName": getpass.getuser(),
"lastName": "",
"email": getpass.getuser() + "@sdnr.onap.org",
"enabled": "true",
"username": getpass.getuser(),
"credentials": [
{
"type": "password",
"value": password,
"temporary": False
}
]
}
createUser(token, realmConfig, systemUser)
# Grants a role to a user
def addUserRole(user: dict, role: list, options: dict):
url = options['url'] + '/' + user['id'] + '/role-mappings/realm'
try:
for irole in role:
response = requests.post(url, verify=False, json=[{'id': irole['id'], 'name': irole['name']}],
headers=options['headers'])
if response.status_code >= 200 and response.status_code < 300:
print('User role', user['username'], irole['name'], 'created!')
else:
print('Creation of user role', user['username'], irole['name'], 'failed!\n', response.text)
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
# searches for the role of a given user
def findRole(username: str, authConfig: dict, realmConfig: dict) -> dict:
roleList = []
roleNames = []
roleName = 'administration'
for grant in authConfig['grants']:
if grant['username'] == username:
roleName = grant['role']
roleNames = roleName.split(",") # A user can have multiple roles, comma separated
for iroleName in roleNames:
for role in realmConfig['roles']['realm']:
if role['name'] == iroleName:
roleList.append(role)
return roleList
# adds roles to users
def addUserRoles(token, realmConfig, authConfig):
realmId = realmConfig['id']
url = base + '/admin/realms/' + realmId + '/users'
auth = 'bearer ' + token
headers = {
'content-type': 'application/json',
'accept': 'application/json',
'authorization': auth
}
try:
response = requests.get(url, verify=False, headers=headers)
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
if response.status_code >= 200 and response.status_code < 300:
users = response.json()
options = {
"url": url,
"auth": auth,
"headers": headers
}
for user in users:
role = findRole(user['username'], authConfig, realmConfig)
addUserRole(user, role, options)
else:
sys.exit('Getting users failed.')
# main
(realmFile, authFile, readyTimeout) = loadArgs(sys.argv)
username = get_env('ADMIN_USERNAME')
password = get_env('ADMIN_PASSWORD')
base = getBaseUrl()
isReady(readyTimeout)
token = getToken()
if token:
with open(realmFile) as file:
realmConfig = json.load(file)
if not checkRealmExists(token, realmConfig['id']):
createRealm(token, realmConfig)
with open(authFile) as authConfig:
authConfig = json.load(authConfig)
createUsers(token, realmConfig, authConfig)
addUserRoles(token, realmConfig, authConfig)
exit(0)
exit(1)