前言
OAuth
2.0是业界标准的授权框架,允许第三方应用以有限的方式访问用户资源,而无需暴露用户密码。OpenID
Connect(OIDC)是构建在OAuth
2.0之上的身份认证层。本文将深入分析各种OAuth流程和OIDC的核心机制。
OAuth 2.0基本概念
角色定义
graph TB
RO[Resource Owner<br>资源所有者<br>即最终用户] --> CLIENT[Client<br>第三方应用<br>如某App]
CLIENT --> AS[Authorization Server<br>授权服务器<br>如Google OAuth]
AS --> RS[Resource Server<br>资源服务器<br>如Google API]
RO -.->|拥有资源| RS
style RO fill:#1976d2,color:#fff
style CLIENT fill:#388e3c,color:#fff
style AS fill:#f57c00,color:#fff
style RS fill:#7b1fa2,color:#fff
Resource Owner :拥有受保护资源的用户
Client :想要访问用户资源的第三方应用
Authorization Server :颁发令牌的服务器
Resource Server :托管受保护资源的服务器
Authorization Code
Flow(授权码流程)
最安全、最推荐的OAuth流程,适用于有后端的Web应用。
sequenceDiagram
participant U as User (Browser)
participant C as Client (Backend)
participant AS as Auth Server
participant RS as Resource Server
U->>C: 1. 点击"使用Google登录"
C->>U: 2. 重定向到授权端点
U->>AS: 3. GET /authorize?<br>response_type=code&<br>client_id=xxx&<br>redirect_uri=xxx&<br>scope=openid profile&<br>state=random123
AS->>U: 4. 展示登录和授权页面
U->>AS: 5. 用户授权
AS->>U: 6. 重定向到 redirect_uri?code=AUTH_CODE&state=random123
U->>C: 7. 回调请求 (携带code)
Note over C: 8. 验证state防止CSRF
C->>AS: 9. POST /token<br>grant_type=authorization_code&<br>code=AUTH_CODE&<br>client_id=xxx&<br>client_secret=xxx
AS->>C: 10. 返回 access_token + refresh_token
C->>RS: 11. GET /api/user<br>Authorization: Bearer access_token
RS->>C: 12. 返回用户资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 from flask import Flask, redirect, request, sessionimport requestsimport secrets app = Flask(__name__) OAUTH_CONFIG = { "client_id" : "your_client_id" , "client_secret" : "your_client_secret" , "authorize_url" : "https://accounts.google.com/o/oauth2/v2/auth" , "token_url" : "https://oauth2.googleapis.com/token" , "redirect_uri" : "https://yourapp.com/callback" , "scope" : "openid email profile" , }@app.route("/login" ) def login (): state = secrets.token_urlsafe(32 ) session["oauth_state" ] = state params = { "response_type" : "code" , "client_id" : OAUTH_CONFIG["client_id" ], "redirect_uri" : OAUTH_CONFIG["redirect_uri" ], "scope" : OAUTH_CONFIG["scope" ], "state" : state, "access_type" : "offline" , "prompt" : "consent" , } auth_url = f"{OAUTH_CONFIG['authorize_url' ]} ?{urlencode(params)} " return redirect(auth_url)@app.route("/callback" ) def callback (): if request.args.get("state" ) != session.pop("oauth_state" , None ): return "Invalid state" , 403 code = request.args.get("code" ) if not code: return "Missing authorization code" , 400 token_response = requests.post(OAUTH_CONFIG["token_url" ], data={ "grant_type" : "authorization_code" , "code" : code, "client_id" : OAUTH_CONFIG["client_id" ], "client_secret" : OAUTH_CONFIG["client_secret" ], "redirect_uri" : OAUTH_CONFIG["redirect_uri" ], }) tokens = token_response.json() access_token = tokens["access_token" ] refresh_token = tokens.get("refresh_token" ) user_info = requests.get( "https://www.googleapis.com/oauth2/v3/userinfo" , headers={"Authorization" : f"Bearer {access_token} " } ).json() return f"Welcome, {user_info['name' ]} !"
Authorization Code with PKCE
PKCE(Proof Key for Code
Exchange)是为公开客户端(SPA、移动端)设计的增强安全机制,不需要client_secret。
sequenceDiagram
participant U as User
participant SPA as SPA/Mobile App
participant AS as Auth Server
Note over SPA: 生成 code_verifier (随机字符串)<br>计算 code_challenge = SHA256(code_verifier)
SPA->>AS: GET /authorize?<br>response_type=code&<br>code_challenge=xxx&<br>code_challenge_method=S256
AS->>U: 登录授权页面
U->>AS: 授权
AS->>SPA: redirect_uri?code=AUTH_CODE
SPA->>AS: POST /token<br>grant_type=authorization_code&<br>code=AUTH_CODE&<br>code_verifier=原始随机字符串
Note over AS: 验证 SHA256(code_verifier) == 之前的code_challenge
AS->>SPA: access_token + refresh_token
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import crypto from 'crypto' ;class PKCEAuthFlow { generateCodeVerifier (): string { return crypto.randomBytes (32 ) .toString ('base64url' ); } generateCodeChallenge (verifier : string ): string { return crypto.createHash ('sha256' ) .update (verifier) .digest ('base64url' ); } buildAuthUrl (codeChallenge : string ): string { const params = new URLSearchParams ({ response_type : 'code' , client_id : 'your_client_id' , redirect_uri : 'https://app.example.com/callback' , scope : 'openid profile email' , state : crypto.randomBytes (16 ).toString ('hex' ), code_challenge : codeChallenge, code_challenge_method : 'S256' , }); return `https://auth.example.com/authorize?${params} ` ; } async exchangeToken (code: string , codeVerifier: string ) { const response = await fetch ('https://auth.example.com/token' , { method : 'POST' , headers : { 'Content-Type' : 'application/x-www-form-urlencoded' }, body : new URLSearchParams ({ grant_type : 'authorization_code' , code, client_id : 'your_client_id' , redirect_uri : 'https://app.example.com/callback' , code_verifier : codeVerifier, }), }); return response.json (); } }
Client Credentials Flow
适用于服务间通信(M2M),没有用户参与。
sequenceDiagram
participant S as Service A
participant AS as Auth Server
participant RS as Service B (Resource)
S->>AS: POST /token<br>grant_type=client_credentials&<br>client_id=xxx&<br>client_secret=xxx&<br>scope=api:read
AS->>S: access_token (短生命周期)
S->>RS: GET /api/data<br>Authorization: Bearer access_token
RS->>RS: 验证token
RS->>S: 返回数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func getM2MToken (clientID, clientSecret, tokenURL string ) (string , error ) { data := url.Values{ "grant_type" : {"client_credentials" }, "client_id" : {clientID}, "client_secret" : {clientSecret}, "scope" : {"api:read api:write" }, } resp, err := http.PostForm(tokenURL, data) if err != nil { return "" , fmt.Errorf("token request failed: %w" , err) } defer resp.Body.Close() var result struct { AccessToken string `json:"access_token"` ExpiresIn int `json:"expires_in"` TokenType string `json:"token_type"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return "" , fmt.Errorf("decode response failed: %w" , err) } return result.AccessToken, nil }
Device Authorization Flow
适用于输入受限的设备(智能电视、IoT设备、CLI工具)。
sequenceDiagram
participant D as Device (Smart TV)
participant U as User (Phone/PC)
participant AS as Auth Server
D->>AS: POST /device/code<br>client_id=xxx&scope=openid
AS->>D: device_code + user_code + verification_uri
Note over D: 显示: 请访问 https://auth.example.com/device<br>输入代码: WDJB-MJHT
D->>AS: 轮询 POST /token<br>grant_type=urn:ietf:params:oauth:grant-type:device_code&<br>device_code=xxx
U->>AS: 访问验证URI,输入user_code
U->>AS: 登录并授权
AS->>D: access_token + refresh_token (轮询成功)
Note over D: 设备登录完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import timeimport requestsdef device_auth_flow (client_id, auth_server ): resp = requests.post(f"{auth_server} /device/code" , data={ "client_id" : client_id, "scope" : "openid profile" }) device_data = resp.json() print (f"请访问: {device_data['verification_uri_complete' ]} " ) print (f"或访问 {device_data['verification_uri' ]} 输入代码: {device_data['user_code' ]} " ) interval = device_data.get("interval" , 5 ) device_code = device_data["device_code" ] while True : time.sleep(interval) token_resp = requests.post(f"{auth_server} /token" , data={ "grant_type" : "urn:ietf:params:oauth:grant-type:device_code" , "device_code" : device_code, "client_id" : client_id, }) result = token_resp.json() if "access_token" in result: return result elif result.get("error" ) == "authorization_pending" : continue elif result.get("error" ) == "slow_down" : interval += 5 elif result.get("error" ) == "expired_token" : raise Exception("Device code expired" ) else : raise Exception(f"Error: {result.get('error' )} " )
OpenID Connect(OIDC)
OIDC在OAuth 2.0之上添加身份认证层。OAuth
2.0只负责授权(authorization),OIDC增加了认证(authentication)。
graph TB
subgraph "OAuth 2.0"
AUTH[Authorization<br>授权]
AT[Access Token<br>访问令牌]
end
subgraph "OIDC (基于OAuth 2.0)"
AUTHN[Authentication<br>认证]
IDT[ID Token (JWT)<br>身份令牌]
UI[UserInfo Endpoint<br>用户信息端点]
DISC[Discovery<br>/.well-known/openid-configuration]
end
AUTH --> AUTHN
AT --> IDT
AT --> UI
style OIDC fill:#e8f5e9
ID Token(JWT)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 { "alg" : "RS256" , "kid" : "1234567890" , "typ" : "JWT" } .{ "iss" : "https://accounts.google.com" , "sub" : "110169484474386276334" , "aud" : "your_client_id" , "exp" : 1723456789 , "iat" : 1723453189 , "nonce" : "abc123" , "at_hash" : "HK6E_P6Dh8Y93mRNtsDB1Q" , "email" : "[email protected] " , "email_verified" : true , "name" : "John Doe" , "picture" : "https://example.com/photo.jpg" } .{ signature}
ID Token验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 from jose import jwt, JWTErrorimport requestsclass IDTokenVerifier : def __init__ (self, issuer, client_id ): self.issuer = issuer self.client_id = client_id self.jwks = self._fetch_jwks() def _fetch_jwks (self ): """从OIDC Discovery获取签名公钥""" config_url = f"{self.issuer} /.well-known/openid-configuration" config = requests.get(config_url).json() jwks_uri = config["jwks_uri" ] return requests.get(jwks_uri).json() def verify (self, id_token, nonce=None ): """验证ID Token""" try : payload = jwt.decode( id_token, self.jwks, algorithms=["RS256" ], audience=self.client_id, issuer=self.issuer, ) if nonce and payload.get("nonce" ) != nonce: raise ValueError("Invalid nonce" ) return payload except JWTError as e: raise ValueError(f"Invalid ID token: {e} " )
UserInfo Endpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 curl -H "Authorization: Bearer ACCESS_TOKEN" \ https://openidconnect.googleapis.com/v1/userinfo { "sub" : "110169484474386276334" , "name" : "John Doe" , "given_name" : "John" , "family_name" : "Doe" , "picture" : "https://example.com/photo.jpg" , "email" : "[email protected] " , "email_verified" : true , "locale" : "en" }
OIDC Discovery
1 2 3 4 5 6 7 8 9 10 11 12 13 14 curl https://accounts.google.com/.well-known/openid-configuration | jq { "issuer" : "https://accounts.google.com" , "authorization_endpoint" : "https://accounts.google.com/o/oauth2/v2/auth" , "token_endpoint" : "https://oauth2.googleapis.com/token" , "userinfo_endpoint" : "https://openidconnect.googleapis.com/v1/userinfo" , "revocation_endpoint" : "https://oauth2.googleapis.com/revoke" , "jwks_uri" : "https://www.googleapis.com/oauth2/v3/certs" , "response_types_supported" : ["code" , "token" , "id_token" , ...], "scopes_supported" : ["openid" , "email" , "profile" ], "id_token_signing_alg_values_supported" : ["RS256" ] }
Token Introspection
用于验证token是否有效(RFC 7662),常用于资源服务器。
sequenceDiagram
participant C as Client
participant RS as Resource Server
participant AS as Auth Server
C->>RS: GET /api/resource<br>Authorization: Bearer TOKEN
RS->>AS: POST /introspect<br>token=TOKEN&<br>token_type_hint=access_token
AS->>RS: {active: true, sub: "user123", scope: "read write", exp: 1723456789}
alt token有效
RS->>C: 200 OK + Resource
else token无效
RS->>C: 401 Unauthorized
end
最佳实践
流程选择指南
flowchart TB
START[选择OAuth流程] --> Q1{有用户参与?}
Q1 -->|否| CC[Client Credentials]
Q1 -->|是| Q2{客户端类型?}
Q2 -->|Web后端| AC[Authorization Code]
Q2 -->|SPA/Mobile| PKCE[Authorization Code + PKCE]
Q2 -->|输入受限设备| DEVICE[Device Authorization]
CC --> NOTE_CC[M2M通信<br>微服务间调用]
AC --> NOTE_AC[最安全<br>可存储client_secret]
PKCE --> NOTE_PKCE[不需要client_secret<br>防止授权码拦截]
DEVICE --> NOTE_DEVICE[智能TV/CLI<br>用户在其他设备授权]
安全建议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 security_best_practices: tokens: - "Access Token生命周期短 (5-15分钟)" - "使用Refresh Token轮换 (Rotation)" - "Refresh Token绑定设备/客户端" - "Token存储在安全位置 (HttpOnly Cookie / Secure Storage)" authorization: - "始终验证state参数防止CSRF" - "使用PKCE (即使是机密客户端也推荐)" - "严格的redirect_uri白名单,精确匹配" - "最小scope原则" implementation: - "使用成熟的OAuth库而非自己实现" - "ID Token必须验证签名、issuer、audience、expiry" - "不要在URL中传递token (用POST body或Header)" - "实现token撤销机制"
总结
OAuth 2.0和OIDC是现代应用身份认证和授权的基石:
Authorization Code +
PKCE 是最推荐的流程,即使是机密客户端也应使用PKCE
Client Credentials 适用于无用户参与的服务间通信
OIDC 在OAuth之上提供标准化的身份认证,ID
Token携带用户身份信息
Token管理 是安全的关键——短生命周期、安全存储、及时轮换
永远不要自己实现OAuth ——使用经过审计的成熟库
正确理解和实现OAuth/OIDC,是构建安全可靠的现代应用的必备技能。