forked from baron/baron-sso
189 lines
7.2 KiB
Python
189 lines
7.2 KiB
Python
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from http import cookiejar
|
|
import json
|
|
import os
|
|
import threading
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
CLIENT_ID = os.environ["CLIENT_ID"]
|
|
SUBJECT = os.environ["SUBJECT"]
|
|
REDIRECT_URI = os.environ["REDIRECT_URI"]
|
|
SCOPE = os.environ["SCOPE"]
|
|
STATE = os.environ["STATE"]
|
|
NONCE = os.environ["NONCE"]
|
|
ADMIN_BASE = os.environ.get("HYDRA_ADMIN_URL", "http://127.0.0.1:4445")
|
|
PUBLIC_BASE = os.environ.get("HYDRA_PUBLIC_URL", "http://127.0.0.1:4444")
|
|
|
|
|
|
def _put_json(url: str, payload: dict) -> dict:
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(url, data=data, method="PUT")
|
|
req.add_header("Content-Type", "application/json")
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
body = resp.read().decode("utf-8")
|
|
return json.loads(body) if body else {}
|
|
|
|
|
|
def accept_login(challenge: str) -> str:
|
|
url = f"{ADMIN_BASE}/oauth2/auth/requests/login/accept?login_challenge={urllib.parse.quote(challenge)}"
|
|
payload = {"subject": SUBJECT, "remember": True, "remember_for": 3600}
|
|
data = _put_json(url, payload)
|
|
return data.get("redirect_to", "")
|
|
|
|
|
|
def accept_consent(challenge: str) -> str:
|
|
url = f"{ADMIN_BASE}/oauth2/auth/requests/consent/accept?consent_challenge={urllib.parse.quote(challenge)}"
|
|
payload = {"grant_scope": ["openid", "profile", "email"], "remember": True, "remember_for": 3600}
|
|
data = _put_json(url, payload)
|
|
return data.get("redirect_to", "")
|
|
|
|
|
|
def _location_from_response(url: str, cookie_header: str | None) -> str:
|
|
req = urllib.request.Request(url, method="GET")
|
|
if cookie_header:
|
|
req.add_header("Cookie", cookie_header)
|
|
opener = urllib.request.build_opener(NoRedirect())
|
|
try:
|
|
opener.open(req, timeout=5)
|
|
except urllib.error.HTTPError as err:
|
|
return err.headers.get("Location", "")
|
|
return ""
|
|
|
|
|
|
class NoRedirect(urllib.request.HTTPRedirectHandler):
|
|
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
|
raise urllib.error.HTTPError(newurl, code, msg, headers, fp)
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
login_challenge = (params.get("login_challenge") or [""])[0]
|
|
consent_challenge = (params.get("consent_challenge") or [""])[0]
|
|
login_verifier = (params.get("login_verifier") or [""])[0]
|
|
consent_verifier = (params.get("consent_verifier") or [""])[0]
|
|
|
|
if parsed.path == "/oauth2/auth" and consent_verifier:
|
|
query = urllib.parse.urlencode({
|
|
"consent_verifier": consent_verifier,
|
|
"client_id": (params.get("client_id") or [""])[0],
|
|
"redirect_uri": (params.get("redirect_uri") or [""])[0],
|
|
"response_type": (params.get("response_type") or [""])[0],
|
|
"scope": (params.get("scope") or [""])[0],
|
|
"state": (params.get("state") or [""])[0],
|
|
"nonce": (params.get("nonce") or [""])[0],
|
|
})
|
|
public_url = f"{PUBLIC_BASE}/oauth2/auth?{query}"
|
|
location = _location_from_response(public_url, self.headers.get("Cookie"))
|
|
print(f"consent_verifier_location={location}")
|
|
if not location:
|
|
self.send_response(400)
|
|
self.end_headers()
|
|
self.wfile.write(b"missing redirect location")
|
|
return
|
|
self.send_response(302)
|
|
self.send_header("Location", location)
|
|
self.end_headers()
|
|
return
|
|
|
|
if parsed.path == "/oauth2/auth" and login_verifier:
|
|
query = urllib.parse.urlencode({
|
|
"login_verifier": login_verifier,
|
|
"client_id": (params.get("client_id") or [""])[0],
|
|
"redirect_uri": (params.get("redirect_uri") or [""])[0],
|
|
"response_type": (params.get("response_type") or [""])[0],
|
|
"scope": (params.get("scope") or [""])[0],
|
|
"state": (params.get("state") or [""])[0],
|
|
"nonce": (params.get("nonce") or [""])[0],
|
|
})
|
|
public_url = f"{PUBLIC_BASE}/oauth2/auth?{query}"
|
|
location = _location_from_response(public_url, self.headers.get("Cookie"))
|
|
print(f"login_verifier_location={location}")
|
|
if not location:
|
|
self.send_response(400)
|
|
self.end_headers()
|
|
self.wfile.write(b"missing redirect location")
|
|
return
|
|
consent_challenge = urllib.parse.parse_qs(urllib.parse.urlparse(location).query).get(
|
|
"consent_challenge",
|
|
[""],
|
|
)[0]
|
|
if not consent_challenge:
|
|
self.send_response(400)
|
|
self.end_headers()
|
|
self.wfile.write(f"missing consent_challenge location={location}".encode("utf-8"))
|
|
return
|
|
redirect_to = accept_consent(consent_challenge)
|
|
if not redirect_to:
|
|
self.send_response(500)
|
|
self.end_headers()
|
|
self.wfile.write(b"consent accept failed")
|
|
return
|
|
self.send_response(302)
|
|
self.send_header("Location", redirect_to)
|
|
self.end_headers()
|
|
return
|
|
|
|
if login_challenge:
|
|
redirect_to = accept_login(login_challenge)
|
|
elif consent_challenge:
|
|
redirect_to = accept_consent(consent_challenge)
|
|
else:
|
|
redirect_to = ""
|
|
|
|
if not redirect_to:
|
|
self.send_response(400)
|
|
self.end_headers()
|
|
self.wfile.write(b"missing challenge")
|
|
return
|
|
|
|
self.send_response(302)
|
|
self.send_header("Location", redirect_to)
|
|
self.end_headers()
|
|
|
|
def log_message(self, format, *args):
|
|
return
|
|
|
|
|
|
class StopAtRedirect(urllib.request.HTTPRedirectHandler):
|
|
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
|
if newurl.startswith(REDIRECT_URI):
|
|
raise urllib.error.HTTPError(newurl, code, msg, headers, fp)
|
|
return super().redirect_request(req, fp, code, msg, headers, newurl)
|
|
|
|
|
|
def main():
|
|
server = HTTPServer(("127.0.0.1", 3000), Handler)
|
|
thread = threading.Thread(target=server.serve_forever, daemon=True)
|
|
thread.start()
|
|
|
|
encoded_redirect = urllib.parse.quote(REDIRECT_URI, safe="")
|
|
encoded_scope = urllib.parse.quote(SCOPE, safe="")
|
|
auth_url = (
|
|
f"{PUBLIC_BASE}/oauth2/auth?response_type=code"
|
|
f"&client_id={CLIENT_ID}"
|
|
f"&redirect_uri={encoded_redirect}"
|
|
f"&scope={encoded_scope}"
|
|
f"&state={STATE}"
|
|
f"&nonce={NONCE}"
|
|
)
|
|
|
|
jar = cookiejar.CookieJar()
|
|
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar), StopAtRedirect())
|
|
try:
|
|
opener.open(auth_url, timeout=10)
|
|
except urllib.error.HTTPError as err:
|
|
body = err.read().decode("utf-8") if hasattr(err, "read") else ""
|
|
print(f"error_url={err.geturl()}")
|
|
print(f"error_code={err.code}")
|
|
if body:
|
|
print(f"error_body={body}")
|
|
finally:
|
|
server.shutdown()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|