# Copyright 2020 Harish Krupo
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from xdg.BaseDirectory import (
|
|
load_first_config,
|
|
save_data_path
|
|
)
|
|
|
|
import argparse
|
|
import webbrowser
|
|
import logging
|
|
import json
|
|
import msal
|
|
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from urllib.parse import urlparse, parse_qs, urlencode
|
|
from wsgiref import simple_server
|
|
import wsgiref.util
|
|
import sys
|
|
import uuid
|
|
import pprint
|
|
import os
|
|
import atexit
|
|
import base64
|
|
import gnupg
|
|
import io
|
|
|
|
pp = pprint.PrettyPrinter(indent=4)
|
|
|
|
credentials_file = save_data_path("oauth2ms") + "/credentials.bin"
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
APP_NAME = "oauth2ms"
|
|
SUCCESS_MESSAGE = "Authorization complete."
|
|
|
|
|
|
def load_config():
|
|
config_file = load_first_config(APP_NAME, "config.json")
|
|
if config_file is None:
|
|
print(f"Couldn't find configuration file. Config file must be at: $XDG_CONFIG_HOME/{APP_NAME}/config.json")
|
|
print("Current value of $XDG_CONFIG_HOME is {}".format(os.getenv("XDG_CONFIG_HOME")))
|
|
return None
|
|
return json.load(open(config_file, 'r'))
|
|
|
|
def build_msal_app(config, cache = None):
|
|
return msal.ConfidentialClientApplication(
|
|
config['client_id'],
|
|
authority="https://login.microsoftonline.com/" + config['tenant_id'],
|
|
client_credential=config['client_secret'], token_cache=cache)
|
|
|
|
def get_auth_url(config, cache, state):
|
|
return build_msal_app(config, cache).get_authorization_request_url(
|
|
config['scopes'],
|
|
state=state,
|
|
redirect_uri=config["redirect_uri"]);
|
|
|
|
class WSGIRequestHandler(wsgiref.simple_server.WSGIRequestHandler):
|
|
"""Silence out the messages from the http server"""
|
|
def log_message(self, format, *args):
|
|
pass
|
|
|
|
class WSGIRedirectionApp(object):
|
|
"""WSGI app to handle the authorization redirect.
|
|
|
|
Stores the request URI and displays the given success message.
|
|
"""
|
|
|
|
def __init__(self, message):
|
|
self.last_request_uri = None
|
|
self._success_message = message
|
|
|
|
def __call__(self, environ, start_response):
|
|
start_response("200 OK", [("Content-type", "text/plain; charset=utf-8")])
|
|
self.last_request_uri = wsgiref.util.request_uri(environ)
|
|
return [self._success_message.encode("utf-8")]
|
|
|
|
def validate_config(config):
|
|
conditionsBoth = [
|
|
"tenant_id" in config,
|
|
"client_id" in config,
|
|
"scopes" in config,
|
|
]
|
|
|
|
conditionsAuth = [
|
|
"redirect_host" in config,
|
|
"redirect_port" in config,
|
|
"redirect_path" in config,
|
|
"client_secret" in config
|
|
]
|
|
|
|
if not all(conditionsBoth):
|
|
print("Invalid config")
|
|
print("Config must contain the keys: " +
|
|
"tenant_id, client_id, scopes");
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def build_new_app_state():
|
|
cache = msal.SerializableTokenCache()
|
|
config = load_config()
|
|
if config is None:
|
|
return None
|
|
|
|
if not validate_config(config):
|
|
return None
|
|
|
|
|
|
state = str(uuid.uuid4())
|
|
redirect_path = config["redirect_path"]
|
|
# Remove / at the end if present, and path is not just /
|
|
if redirect_path != "/" and redirect_path[-1] == "/":
|
|
redirect_path = redirect_path[:-1]
|
|
|
|
config["redirect_uri"] = "http://" + config['redirect_host'] + ":" + config['redirect_port'] + redirect_path
|
|
auth_url = get_auth_url(config, cache, state)
|
|
wsgi_app = WSGIRedirectionApp(SUCCESS_MESSAGE)
|
|
http_server = simple_server.make_server(config['redirect_host'],
|
|
int(config['redirect_port']),
|
|
wsgi_app,
|
|
handler_class=WSGIRequestHandler)
|
|
|
|
webbrowser.open(auth_url, new=2, autoraise=True)
|
|
|
|
http_server.handle_request()
|
|
|
|
auth_response = wsgi_app.last_request_uri
|
|
http_server.server_close()
|
|
parsed_auth_response = parse_qs(auth_response)
|
|
|
|
code_key = config["redirect_uri"] + "?code"
|
|
if code_key not in parsed_auth_response:
|
|
return None
|
|
|
|
auth_code = parsed_auth_response[code_key]
|
|
|
|
result = build_msal_app(config, cache).acquire_token_by_authorization_code(
|
|
auth_code,
|
|
scopes=config['scopes'],
|
|
redirect_uri=config["redirect_uri"]);
|
|
|
|
if result.get("access_token") is None:
|
|
print("Something went wrong during authorization")
|
|
print(f"Server returned: {result}")
|
|
return None
|
|
|
|
token = result["access_token"]
|
|
app = {}
|
|
app['config'] = config
|
|
app['cache'] = cache
|
|
return app, token
|
|
|
|
def fetch_token_from_cache(app):
|
|
config = app['config']
|
|
cache = app['cache']
|
|
cca = build_msal_app(config, cache)
|
|
accounts = cca.get_accounts()
|
|
if cca.get_accounts:
|
|
result = cca.acquire_token_silent(config['scopes'], account=accounts[0])
|
|
return result["access_token"]
|
|
else:
|
|
None
|
|
|
|
def build_app_state_from_credentials():
|
|
# If the file is missing return None
|
|
if not os.path.exists(credentials_file):
|
|
return None
|
|
|
|
credentials = open(credentials_file, "r").read()
|
|
|
|
# Make sure it is a valid json object
|
|
try:
|
|
config = json.loads(credentials)
|
|
except:
|
|
print("Not a valild json file or it is ecrypted. Maybe add/remove the -e arugment?")
|
|
sys.exit(1)
|
|
|
|
# We don't have a token cache?
|
|
if config.get("token_cache") is None:
|
|
return None
|
|
|
|
app_state = {};
|
|
cache = msal.SerializableTokenCache()
|
|
cache.deserialize(config["token_cache"])
|
|
app_state["config"] = config
|
|
app_state["cache"] = cache
|
|
return app_state
|
|
|
|
token = None
|
|
app_state = build_app_state_from_credentials()
|
|
if app_state is None:
|
|
app_state, token = build_new_app_state()
|
|
|
|
if app_state is None:
|
|
print("Something went wrong!")
|
|
sys.exit(1)
|
|
|
|
if token is None:
|
|
token = fetch_token_from_cache(app_state)
|
|
|
|
cache = app_state['cache']
|
|
if cache.has_state_changed:
|
|
config = app_state["config"]
|
|
config["token_cache"] = cache.serialize()
|
|
config_json = json.dumps(config)
|
|
open(credentials_file, "w").write(config_json)
|