| #!/usr/bin/env python |
| ################################################################################ |
| # Copyright 2021 highstreet technologies GmbH |
| # |
| # Licensed under the Apache License, Version 2.0 (the 'License'); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an 'AS IS' BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| # importing the sys, json, requests library |
| import os |
| import sys |
| import json |
| import time |
| import getpass |
| import requests |
| import warnings |
| from jproperties import Properties |
| from typing import List |
| warnings.filterwarnings('ignore', message='Unverified HTTPS request') |
| # global configurations |
| |
| |
| def get_environment_variable(name): |
| configs = Properties() |
| env_file = os.path.dirname(os.path.abspath(__file__)) + '/' + '../' + '.env' |
| with open(env_file, "rb") as read_prop: |
| configs.load(read_prop) |
| return configs.get(name).data |
| |
| |
| def load_arguments(args: List[str]) -> tuple: |
| realm_file = os.path.dirname(os.path.abspath( |
| __file__)) + '/o-ran-sc-realm.json' |
| auth_file = os.path.dirname(os.path.abspath( |
| __file__)) + '/authentication.json' |
| ready_timeout = 180 |
| args.pop(0) |
| while len(args) > 0: |
| arg = args.pop(0) |
| if arg == '--auth' and len(args) > 0: |
| auth_file = args.pop(0) |
| print('overwriting auth file: {}'.format(auth_file)) |
| elif arg == '--realm' and len(args) > 0: |
| realm_file = args.pop(0) |
| print('overwriting realm file: {}'.format(realm_file)) |
| elif arg == '--timeout' and len(args) > 0: |
| ready_timeout = int(args.pop(0)) |
| print('waiting for ready {} seconds'.format(ready_timeout)) |
| |
| return (realm_file, auth_file, ready_timeout) |
| |
| |
| def isReady(timeoutSeconds=180): |
| url = getBaseUrl() |
| while timeoutSeconds > 0: |
| try: |
| response = requests.get(url, verify=False, headers={}) |
| except: |
| pass |
| if response is not None and response.status_code == 200: |
| return True |
| time.sleep(1) |
| timeoutSeconds -= 1 |
| return False |
| |
| |
| def getBaseUrl(): |
| return get_environment_variable("IDENTITY_PROVIDER_URL") |
| |
| # Request a token for further communication |
| def getToken(): |
| url = base + '/realms/master/protocol/openid-connect/token' |
| headers = { |
| 'content-type': 'application/x-www-form-urlencoded', |
| 'accept': 'application/json' |
| } |
| body = { |
| 'client_id': 'admin-cli', |
| 'grant_type': 'password', |
| 'username': username, |
| 'password': password |
| } |
| try: |
| response = requests.post(url, verify=False, auth=( |
| username, password), data=body, headers=headers) |
| except requests.exceptions.Timeout: |
| sys.exit('HTTP request failed, please check you internet connection.') |
| except requests.exceptions.TooManyRedirects: |
| sys.exit('HTTP request failed, please check your proxy settings.') |
| except requests.exceptions.RequestException as e: |
| # catastrophic error. bail. |
| raise SystemExit(e) |
| |
| if response.status_code >= 200 and response.status_code < 300: |
| print('Got token!') |
| return response.json()['access_token'] |
| else: |
| sys.exit('Getting token failed.') |
| |
| # create the default realm from file |
| |
| |
| def createRealm(token, realm): |
| url = base + '/admin/realms' |
| auth = 'bearer ' + token |
| headers = { |
| 'content-type': 'application/json', |
| 'accept': 'application/json', |
| 'authorization': auth |
| } |
| try: |
| response = requests.post( |
| url, verify=False, json=realm, headers=headers) |
| except requests.exceptions.Timeout: |
| sys.exit('HTTP request failed, please check you internet connection.') |
| except requests.exceptions.TooManyRedirects: |
| sys.exit('HTTP request failed, please check your proxy settings.') |
| except requests.exceptions.RequestException as e: |
| # catastrophic error. bail. |
| raise SystemExit(e) |
| |
| return response.status_code >= 200 and response.status_code < 300 |
| |
| # Check if default realm exists |
| |
| |
| def checkRealmExists(token, realmId): |
| url = base + '/admin/realms/' + realmId |
| auth = 'bearer ' + token |
| headers = { |
| 'accept': 'application/json', |
| 'authorization': auth |
| } |
| try: |
| response = requests.get(url, verify=False, headers=headers) |
| except requests.exceptions.Timeout: |
| sys.exit('HTTP request failed, please check you internet connection.') |
| except requests.exceptions.TooManyRedirects: |
| sys.exit('HTTP request failed, please check your proxy settings.') |
| except requests.exceptions.RequestException as e: |
| # catastrophic error. bail. |
| raise SystemExit(e) |
| |
| if response.status_code >= 200 and response.status_code < 300: |
| return realmId == response.json()['id'] |
| else: |
| # sys.exit('Getting realm failed.') |
| return False |
| |
| # create a user in default realm |
| |
| |
| def createUser(token, realmConfig, user): |
| realmId = realmConfig['id'] |
| url = base + '/admin/realms/' + realmId + '/users' |
| auth = 'bearer ' + token |
| headers = { |
| 'accept': 'application/json', |
| 'authorization': auth |
| } |
| try: |
| response = requests.post(url, verify=False, json=user, headers=headers) |
| except requests.exceptions.Timeout: |
| sys.exit('HTTP request failed, please check you internet connection.') |
| except requests.exceptions.TooManyRedirects: |
| sys.exit('HTTP request failed, please check your proxy settings.') |
| except requests.exceptions.RequestException as e: |
| # catastrophic error. bail. |
| raise SystemExit(e) |
| |
| if response.status_code >= 200 and response.status_code < 300: |
| print('User', user['username'], 'created!') |
| else: |
| print('User creation', user['username'], 'failed!\n', response.text) |
| |
| # creates User accounts in realm based a file |
| |
| |
| def createUsers(token, realmConfig, authConfig): |
| for user in authConfig['users']: |
| createUser(token, realmConfig, user) |
| |
| # create a user based on system user |
| systemUser = { |
| "firstName": getpass.getuser(), |
| "lastName": "", |
| "email": getpass.getuser() + "@sdnr.onap.org", |
| "enabled": "true", |
| "username": getpass.getuser(), |
| "credentials": [ |
| { |
| "type": "password", |
| "value": password, |
| "temporary": False |
| } |
| ] |
| } |
| createUser(token, realmConfig, systemUser) |
| |
| # Grants a role to a user |
| |
| |
| def addUserRole(user: dict, role: dict, options: dict): |
| url = options['url'] + '/' + user['id'] + '/role-mappings/realm' |
| try: |
| response = requests.post(url, verify=False, json=[ |
| {'id': role['id'], 'name':role['name']}], headers=options['headers']) |
| except requests.exceptions.Timeout: |
| sys.exit('HTTP request failed, please check you internet connection.') |
| except requests.exceptions.TooManyRedirects: |
| sys.exit('HTTP request failed, please check your proxy settings.') |
| except requests.exceptions.RequestException as e: |
| # catastrophic error. bail. |
| raise SystemExit(e) |
| |
| if response.status_code >= 200 and response.status_code < 300: |
| print('User role', user['username'], role['name'], 'created!') |
| else: |
| print('Creation of user role', |
| user['username'], role['name'], 'failed!\n', response.text) |
| |
| # searches for the role of a given user |
| |
| |
| def findRole(username: str, authConfig: dict, realmConfig: dict) -> dict: |
| roleName = 'administration' |
| for grant in authConfig['grants']: |
| if grant['username'] == username: |
| roleName = grant['role'] |
| for role in realmConfig['roles']['realm']: |
| if role['name'] == roleName: |
| return role |
| return None |
| |
| # adds roles to users |
| |
| |
| def addUserRoles(token, realmConfig, authConfig): |
| realmId = realmConfig['id'] |
| url = base + '/admin/realms/' + realmId + '/users' |
| auth = 'bearer ' + token |
| headers = { |
| 'content-type': 'application/json', |
| 'accept': 'application/json', |
| 'authorization': auth |
| } |
| try: |
| response = requests.get(url, verify=False, headers=headers) |
| except requests.exceptions.Timeout: |
| sys.exit('HTTP request failed, please check you internet connection.') |
| except requests.exceptions.TooManyRedirects: |
| sys.exit('HTTP request failed, please check your proxy settings.') |
| except requests.exceptions.RequestException as e: |
| # catastrophic error. bail. |
| raise SystemExit(e) |
| |
| if response.status_code >= 200 and response.status_code < 300: |
| users = response.json() |
| options = { |
| "url": url, |
| "auth": auth, |
| "headers": headers |
| } |
| for user in users: |
| role = findRole(user['username'], authConfig, realmConfig) |
| addUserRole(user, role, options) |
| else: |
| sys.exit('Getting users failed.') |
| |
| # main |
| |
| |
| (realmFile, authFile, readyTimeout) = load_arguments(sys.argv) |
| username = get_environment_variable('ADMIN_USERNAME') |
| password = get_environment_variable('ADMIN_PASSWORD') |
| base = getBaseUrl() |
| isReady(readyTimeout) |
| token = getToken() |
| if token: |
| with open(realmFile) as file: |
| realmConfig = json.load(file) |
| if not checkRealmExists(token, realmConfig['id']): |
| createRealm(token, realmConfig) |
| |
| with open(authFile) as authConfig: |
| authConfig = json.load(authConfig) |
| createUsers(token, realmConfig, authConfig) |
| addUserRoles(token, realmConfig, authConfig) |
| exit(0) |
| exit(1) |