blob: 1ff53accb15f63b4a2ce3f270f773def659fa227 [file] [log] [blame]
#!/usr/bin/env python
################################################################################
# Copyright 2021 highstreet technologies GmbH
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# importing the sys, json, requests library
import os
import sys
import json
import time
import getpass
import requests
import warnings
from jproperties import Properties
from typing import List
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
# global configurations
def get_environment_variable(name):
configs = Properties()
env_file = os.path.dirname(os.path.abspath(__file__)) + '/' + '../' + '.env'
with open(env_file, "rb") as read_prop:
configs.load(read_prop)
return configs.get(name).data
def load_arguments(args: List[str]) -> tuple:
realm_file = os.path.dirname(os.path.abspath(
__file__)) + '/o-ran-sc-realm.json'
auth_file = os.path.dirname(os.path.abspath(
__file__)) + '/authentication.json'
ready_timeout = 180
args.pop(0)
while len(args) > 0:
arg = args.pop(0)
if arg == '--auth' and len(args) > 0:
auth_file = args.pop(0)
print('overwriting auth file: {}'.format(auth_file))
elif arg == '--realm' and len(args) > 0:
realm_file = args.pop(0)
print('overwriting realm file: {}'.format(realm_file))
elif arg == '--timeout' and len(args) > 0:
ready_timeout = int(args.pop(0))
print('waiting for ready {} seconds'.format(ready_timeout))
return (realm_file, auth_file, ready_timeout)
def isReady(timeoutSeconds=180):
url = getBaseUrl()
while timeoutSeconds > 0:
try:
response = requests.get(url, verify=False, headers={})
except:
pass
if response is not None and response.status_code == 200:
return True
time.sleep(1)
timeoutSeconds -= 1
return False
def getBaseUrl():
return get_environment_variable("IDENTITY_PROVIDER_URL")
# Request a token for further communication
def getToken():
url = base + '/realms/master/protocol/openid-connect/token'
headers = {
'content-type': 'application/x-www-form-urlencoded',
'accept': 'application/json'
}
body = {
'client_id': 'admin-cli',
'grant_type': 'password',
'username': username,
'password': password
}
try:
response = requests.post(url, verify=False, auth=(
username, password), data=body, headers=headers)
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
if response.status_code >= 200 and response.status_code < 300:
print('Got token!')
return response.json()['access_token']
else:
sys.exit('Getting token failed.')
# create the default realm from file
def createRealm(token, realm):
url = base + '/admin/realms'
auth = 'bearer ' + token
headers = {
'content-type': 'application/json',
'accept': 'application/json',
'authorization': auth
}
try:
response = requests.post(
url, verify=False, json=realm, headers=headers)
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
return response.status_code >= 200 and response.status_code < 300
# Check if default realm exists
def checkRealmExists(token, realmId):
url = base + '/admin/realms/' + realmId
auth = 'bearer ' + token
headers = {
'accept': 'application/json',
'authorization': auth
}
try:
response = requests.get(url, verify=False, headers=headers)
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
if response.status_code >= 200 and response.status_code < 300:
return realmId == response.json()['id']
else:
# sys.exit('Getting realm failed.')
return False
# create a user in default realm
def createUser(token, realmConfig, user):
realmId = realmConfig['id']
url = base + '/admin/realms/' + realmId + '/users'
auth = 'bearer ' + token
headers = {
'accept': 'application/json',
'authorization': auth
}
try:
response = requests.post(url, verify=False, json=user, headers=headers)
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
if response.status_code >= 200 and response.status_code < 300:
print('User', user['username'], 'created!')
else:
print('User creation', user['username'], 'failed!\n', response.text)
# creates User accounts in realm based a file
def createUsers(token, realmConfig, authConfig):
for user in authConfig['users']:
createUser(token, realmConfig, user)
# create a user based on system user
systemUser = {
"firstName": getpass.getuser(),
"lastName": "",
"email": getpass.getuser() + "@sdnr.onap.org",
"enabled": "true",
"username": getpass.getuser(),
"credentials": [
{
"type": "password",
"value": password,
"temporary": False
}
]
}
createUser(token, realmConfig, systemUser)
# Grants a role to a user
def addUserRole(user: dict, role: dict, options: dict):
url = options['url'] + '/' + user['id'] + '/role-mappings/realm'
try:
response = requests.post(url, verify=False, json=[
{'id': role['id'], 'name':role['name']}], headers=options['headers'])
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
if response.status_code >= 200 and response.status_code < 300:
print('User role', user['username'], role['name'], 'created!')
else:
print('Creation of user role',
user['username'], role['name'], 'failed!\n', response.text)
# searches for the role of a given user
def findRole(username: str, authConfig: dict, realmConfig: dict) -> dict:
roleName = 'administration'
for grant in authConfig['grants']:
if grant['username'] == username:
roleName = grant['role']
for role in realmConfig['roles']['realm']:
if role['name'] == roleName:
return role
return None
# adds roles to users
def addUserRoles(token, realmConfig, authConfig):
realmId = realmConfig['id']
url = base + '/admin/realms/' + realmId + '/users'
auth = 'bearer ' + token
headers = {
'content-type': 'application/json',
'accept': 'application/json',
'authorization': auth
}
try:
response = requests.get(url, verify=False, headers=headers)
except requests.exceptions.Timeout:
sys.exit('HTTP request failed, please check you internet connection.')
except requests.exceptions.TooManyRedirects:
sys.exit('HTTP request failed, please check your proxy settings.')
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
if response.status_code >= 200 and response.status_code < 300:
users = response.json()
options = {
"url": url,
"auth": auth,
"headers": headers
}
for user in users:
role = findRole(user['username'], authConfig, realmConfig)
addUserRole(user, role, options)
else:
sys.exit('Getting users failed.')
# main
(realmFile, authFile, readyTimeout) = load_arguments(sys.argv)
username = get_environment_variable('ADMIN_USERNAME')
password = get_environment_variable('ADMIN_PASSWORD')
base = getBaseUrl()
isReady(readyTimeout)
token = getToken()
if token:
with open(realmFile) as file:
realmConfig = json.load(file)
if not checkRealmExists(token, realmConfig['id']):
createRealm(token, realmConfig)
with open(authFile) as authConfig:
authConfig = json.load(authConfig)
createUsers(token, realmConfig, authConfig)
addUserRoles(token, realmConfig, authConfig)
exit(0)
exit(1)