Using the Microsoft Graph API to retrieve Calendar and convert to Orgmode compatible plain text.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

216 lines
6.5 KiB

# Copyright 2020 Harish Krupo
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from xdg.BaseDirectory import (
load_first_config,
save_data_path
)
import argparse
import webbrowser
import logging
import json
import msal
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs, urlencode
from wsgiref import simple_server
import wsgiref.util
import sys
import uuid
import pprint
import os
import atexit
import base64
import gnupg
import io
pp = pprint.PrettyPrinter(indent=4)
credentials_file = save_data_path("oauth2ms") + "/credentials.bin"
_LOGGER = logging.getLogger(__name__)
APP_NAME = "oauth2ms"
SUCCESS_MESSAGE = "Authorization complete."
def load_config():
config_file = load_first_config(APP_NAME, "config.json")
if config_file is None:
print(f"Couldn't find configuration file. Config file must be at: $XDG_CONFIG_HOME/{APP_NAME}/config.json")
print("Current value of $XDG_CONFIG_HOME is {}".format(os.getenv("XDG_CONFIG_HOME")))
return None
return json.load(open(config_file, 'r'))
def build_msal_app(config, cache = None):
return msal.ConfidentialClientApplication(
config['client_id'],
authority="https://login.microsoftonline.com/" + config['tenant_id'],
client_credential=config['client_secret'], token_cache=cache)
def get_auth_url(config, cache, state):
return build_msal_app(config, cache).get_authorization_request_url(
config['scopes'],
state=state,
redirect_uri=config["redirect_uri"]);
class WSGIRequestHandler(wsgiref.simple_server.WSGIRequestHandler):
"""Silence out the messages from the http server"""
def log_message(self, format, *args):
pass
class WSGIRedirectionApp(object):
"""WSGI app to handle the authorization redirect.
Stores the request URI and displays the given success message.
"""
def __init__(self, message):
self.last_request_uri = None
self._success_message = message
def __call__(self, environ, start_response):
start_response("200 OK", [("Content-type", "text/plain; charset=utf-8")])
self.last_request_uri = wsgiref.util.request_uri(environ)
return [self._success_message.encode("utf-8")]
def validate_config(config):
conditionsBoth = [
"tenant_id" in config,
"client_id" in config,
"scopes" in config,
]
conditionsAuth = [
"redirect_host" in config,
"redirect_port" in config,
"redirect_path" in config,
"client_secret" in config
]
if not all(conditionsBoth):
print("Invalid config")
print("Config must contain the keys: " +
"tenant_id, client_id, scopes");
return False
else:
return True
def build_new_app_state():
cache = msal.SerializableTokenCache()
config = load_config()
if config is None:
return None
if not validate_config(config):
return None
state = str(uuid.uuid4())
redirect_path = config["redirect_path"]
# Remove / at the end if present, and path is not just /
if redirect_path != "/" and redirect_path[-1] == "/":
redirect_path = redirect_path[:-1]
config["redirect_uri"] = "http://" + config['redirect_host'] + ":" + config['redirect_port'] + redirect_path
auth_url = get_auth_url(config, cache, state)
wsgi_app = WSGIRedirectionApp(SUCCESS_MESSAGE)
http_server = simple_server.make_server(config['redirect_host'],
int(config['redirect_port']),
wsgi_app,
handler_class=WSGIRequestHandler)
webbrowser.open(auth_url, new=2, autoraise=True)
http_server.handle_request()
auth_response = wsgi_app.last_request_uri
http_server.server_close()
parsed_auth_response = parse_qs(auth_response)
code_key = config["redirect_uri"] + "?code"
if code_key not in parsed_auth_response:
return None
auth_code = parsed_auth_response[code_key]
result = build_msal_app(config, cache).acquire_token_by_authorization_code(
auth_code,
scopes=config['scopes'],
redirect_uri=config["redirect_uri"]);
if result.get("access_token") is None:
print("Something went wrong during authorization")
print(f"Server returned: {result}")
return None
token = result["access_token"]
app = {}
app['config'] = config
app['cache'] = cache
return app, token
def fetch_token_from_cache(app):
config = app['config']
cache = app['cache']
cca = build_msal_app(config, cache)
accounts = cca.get_accounts()
if cca.get_accounts:
result = cca.acquire_token_silent(config['scopes'], account=accounts[0])
return result["access_token"]
else:
None
def build_app_state_from_credentials():
# If the file is missing return None
if not os.path.exists(credentials_file):
return None
credentials = open(credentials_file, "r").read()
# Make sure it is a valid json object
try:
config = json.loads(credentials)
except:
print("Not a valild json file or it is ecrypted. Maybe add/remove the -e arugment?")
sys.exit(1)
# We don't have a token cache?
if config.get("token_cache") is None:
return None
app_state = {};
cache = msal.SerializableTokenCache()
cache.deserialize(config["token_cache"])
app_state["config"] = config
app_state["cache"] = cache
return app_state
token = None
app_state = build_app_state_from_credentials()
if app_state is None:
app_state, token = build_new_app_state()
if app_state is None:
print("Something went wrong!")
sys.exit(1)
if token is None:
token = fetch_token_from_cache(app_state)
cache = app_state['cache']
if cache.has_state_changed:
config = app_state["config"]
config["token_cache"] = cache.serialize()
config_json = json.dumps(config)
open(credentials_file, "w").write(config_json)