Security · #oauth2#authentication#oidc

OAuth2.0与OIDC协议深入解析

2025.06.15 8 min 3.2k
// 目录 · contents

前言

OAuth 2.0是业界标准的授权框架,允许第三方应用以有限的方式访问用户资源,而无需暴露用户密码。OpenID Connect(OIDC)是构建在OAuth 2.0之上的身份认证层。本文将深入分析各种OAuth流程和OIDC的核心机制。

OAuth 2.0基本概念

角色定义

graph TB
    RO[Resource Owner<br>资源所有者<br>即最终用户] --> CLIENT[Client<br>第三方应用<br>如某App]
    CLIENT --> AS[Authorization Server<br>授权服务器<br>如Google OAuth]
    AS --> RS[Resource Server<br>资源服务器<br>如Google API]
    RO -.->|拥有资源| RS

    style RO fill:#1976d2,color:#fff
    style CLIENT fill:#388e3c,color:#fff
    style AS fill:#f57c00,color:#fff
    style RS fill:#7b1fa2,color:#fff
  • Resource Owner:拥有受保护资源的用户
  • Client:想要访问用户资源的第三方应用
  • Authorization Server:颁发令牌的服务器
  • Resource Server:托管受保护资源的服务器

Authorization Code Flow(授权码流程)

最安全、最推荐的OAuth流程,适用于有后端的Web应用。

sequenceDiagram
    participant U as User (Browser)
    participant C as Client (Backend)
    participant AS as Auth Server
    participant RS as Resource Server

    U->>C: 1. 点击"使用Google登录"
    C->>U: 2. 重定向到授权端点
    U->>AS: 3. GET /authorize?<br>response_type=code&<br>client_id=xxx&<br>redirect_uri=xxx&<br>scope=openid profile&<br>state=random123

    AS->>U: 4. 展示登录和授权页面
    U->>AS: 5. 用户授权
    AS->>U: 6. 重定向到 redirect_uri?code=AUTH_CODE&state=random123
    U->>C: 7. 回调请求 (携带code)

    Note over C: 8. 验证state防止CSRF
    C->>AS: 9. POST /token<br>grant_type=authorization_code&<br>code=AUTH_CODE&<br>client_id=xxx&<br>client_secret=xxx
    AS->>C: 10. 返回 access_token + refresh_token

    C->>RS: 11. GET /api/user<br>Authorization: Bearer access_token
    RS->>C: 12. 返回用户资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# Flask OAuth2 Authorization Code实现
from flask import Flask, redirect, request, session
import requests
import secrets

app = Flask(__name__)

OAUTH_CONFIG = {
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"authorize_url": "https://accounts.google.com/o/oauth2/v2/auth",
"token_url": "https://oauth2.googleapis.com/token",
"redirect_uri": "https://yourapp.com/callback",
"scope": "openid email profile",
}

@app.route("/login")
def login():
state = secrets.token_urlsafe(32)
session["oauth_state"] = state

params = {
"response_type": "code",
"client_id": OAUTH_CONFIG["client_id"],
"redirect_uri": OAUTH_CONFIG["redirect_uri"],
"scope": OAUTH_CONFIG["scope"],
"state": state,
"access_type": "offline", # 获取refresh_token
"prompt": "consent",
}
auth_url = f"{OAUTH_CONFIG['authorize_url']}?{urlencode(params)}"
return redirect(auth_url)

@app.route("/callback")
def callback():
# 验证state防止CSRF
if request.args.get("state") != session.pop("oauth_state", None):
return "Invalid state", 403

code = request.args.get("code")
if not code:
return "Missing authorization code", 400

# 用code换取token
token_response = requests.post(OAUTH_CONFIG["token_url"], data={
"grant_type": "authorization_code",
"code": code,
"client_id": OAUTH_CONFIG["client_id"],
"client_secret": OAUTH_CONFIG["client_secret"],
"redirect_uri": OAUTH_CONFIG["redirect_uri"],
})
tokens = token_response.json()

access_token = tokens["access_token"]
refresh_token = tokens.get("refresh_token")

# 使用access_token获取用户信息
user_info = requests.get(
"https://www.googleapis.com/oauth2/v3/userinfo",
headers={"Authorization": f"Bearer {access_token}"}
).json()

return f"Welcome, {user_info['name']}!"

Authorization Code with PKCE

PKCE(Proof Key for Code Exchange)是为公开客户端(SPA、移动端)设计的增强安全机制,不需要client_secret。

sequenceDiagram
    participant U as User
    participant SPA as SPA/Mobile App
    participant AS as Auth Server

    Note over SPA: 生成 code_verifier (随机字符串)<br>计算 code_challenge = SHA256(code_verifier)

    SPA->>AS: GET /authorize?<br>response_type=code&<br>code_challenge=xxx&<br>code_challenge_method=S256

    AS->>U: 登录授权页面
    U->>AS: 授权
    AS->>SPA: redirect_uri?code=AUTH_CODE

    SPA->>AS: POST /token<br>grant_type=authorization_code&<br>code=AUTH_CODE&<br>code_verifier=原始随机字符串

    Note over AS: 验证 SHA256(code_verifier) == 之前的code_challenge

    AS->>SPA: access_token + refresh_token
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// PKCE实现(TypeScript)
import crypto from 'crypto';

class PKCEAuthFlow {
// 1. 生成code_verifier(43-128个字符)
generateCodeVerifier(): string {
return crypto.randomBytes(32)
.toString('base64url');
}

// 2. 计算code_challenge
generateCodeChallenge(verifier: string): string {
return crypto.createHash('sha256')
.update(verifier)
.digest('base64url');
}

// 3. 构造授权URL
buildAuthUrl(codeChallenge: string): string {
const params = new URLSearchParams({
response_type: 'code',
client_id: 'your_client_id',
redirect_uri: 'https://app.example.com/callback',
scope: 'openid profile email',
state: crypto.randomBytes(16).toString('hex'),
code_challenge: codeChallenge,
code_challenge_method: 'S256',
});
return `https://auth.example.com/authorize?${params}`;
}

// 4. 用code换取token(携带code_verifier)
async exchangeToken(code: string, codeVerifier: string) {
const response = await fetch('https://auth.example.com/token', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'authorization_code',
code,
client_id: 'your_client_id',
redirect_uri: 'https://app.example.com/callback',
code_verifier: codeVerifier, // 关键:发送原始verifier
}),
});
return response.json();
}
}

Client Credentials Flow

适用于服务间通信(M2M),没有用户参与。

sequenceDiagram
    participant S as Service A
    participant AS as Auth Server
    participant RS as Service B (Resource)

    S->>AS: POST /token<br>grant_type=client_credentials&<br>client_id=xxx&<br>client_secret=xxx&<br>scope=api:read

    AS->>S: access_token (短生命周期)

    S->>RS: GET /api/data<br>Authorization: Bearer access_token
    RS->>RS: 验证token
    RS->>S: 返回数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Go Client Credentials实现
func getM2MToken(clientID, clientSecret, tokenURL string) (string, error) {
data := url.Values{
"grant_type": {"client_credentials"},
"client_id": {clientID},
"client_secret": {clientSecret},
"scope": {"api:read api:write"},
}

resp, err := http.PostForm(tokenURL, data)
if err != nil {
return "", fmt.Errorf("token request failed: %w", err)
}
defer resp.Body.Close()

var result struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
TokenType string `json:"token_type"`
}

if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", fmt.Errorf("decode response failed: %w", err)
}

return result.AccessToken, nil
}

Device Authorization Flow

适用于输入受限的设备(智能电视、IoT设备、CLI工具)。

sequenceDiagram
    participant D as Device (Smart TV)
    participant U as User (Phone/PC)
    participant AS as Auth Server

    D->>AS: POST /device/code<br>client_id=xxx&scope=openid

    AS->>D: device_code + user_code + verification_uri

    Note over D: 显示: 请访问 https://auth.example.com/device<br>输入代码: WDJB-MJHT

    D->>AS: 轮询 POST /token<br>grant_type=urn:ietf:params:oauth:grant-type:device_code&<br>device_code=xxx

    U->>AS: 访问验证URI,输入user_code
    U->>AS: 登录并授权

    AS->>D: access_token + refresh_token (轮询成功)
    Note over D: 设备登录完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# Device Flow实现
import time
import requests

def device_auth_flow(client_id, auth_server):
# 1. 获取设备码
resp = requests.post(f"{auth_server}/device/code", data={
"client_id": client_id,
"scope": "openid profile"
})
device_data = resp.json()

print(f"请访问: {device_data['verification_uri_complete']}")
print(f"或访问 {device_data['verification_uri']} 输入代码: {device_data['user_code']}")

# 2. 轮询等待用户授权
interval = device_data.get("interval", 5)
device_code = device_data["device_code"]

while True:
time.sleep(interval)

token_resp = requests.post(f"{auth_server}/token", data={
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device_code,
"client_id": client_id,
})

result = token_resp.json()

if "access_token" in result:
return result
elif result.get("error") == "authorization_pending":
continue
elif result.get("error") == "slow_down":
interval += 5
elif result.get("error") == "expired_token":
raise Exception("Device code expired")
else:
raise Exception(f"Error: {result.get('error')}")

OpenID Connect(OIDC)

OIDC在OAuth 2.0之上添加身份认证层。OAuth 2.0只负责授权(authorization),OIDC增加了认证(authentication)。

graph TB
    subgraph "OAuth 2.0"
        AUTH[Authorization<br>授权]
        AT[Access Token<br>访问令牌]
    end

    subgraph "OIDC (基于OAuth 2.0)"
        AUTHN[Authentication<br>认证]
        IDT[ID Token (JWT)<br>身份令牌]
        UI[UserInfo Endpoint<br>用户信息端点]
        DISC[Discovery<br>/.well-known/openid-configuration]
    end

    AUTH --> AUTHN
    AT --> IDT
    AT --> UI

    style OIDC fill:#e8f5e9

ID Token(JWT)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// ID Token解码后的内容
{
// Header
"alg": "RS256",
"kid": "1234567890",
"typ": "JWT"
}
.
{
// Payload
"iss": "https://accounts.google.com", // 签发者
"sub": "110169484474386276334", // 用户唯一标识
"aud": "your_client_id", // 接收者(Client ID)
"exp": 1723456789, // 过期时间
"iat": 1723453189, // 签发时间
"nonce": "abc123", // 防重放
"at_hash": "HK6E_P6Dh8Y93mRNtsDB1Q", // access_token哈希
"email": "[email protected]",
"email_verified": true,
"name": "John Doe",
"picture": "https://example.com/photo.jpg"
}
.
{signature}

ID Token验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# ID Token验证流程
from jose import jwt, JWTError
import requests

class IDTokenVerifier:
def __init__(self, issuer, client_id):
self.issuer = issuer
self.client_id = client_id
self.jwks = self._fetch_jwks()

def _fetch_jwks(self):
"""从OIDC Discovery获取签名公钥"""
config_url = f"{self.issuer}/.well-known/openid-configuration"
config = requests.get(config_url).json()
jwks_uri = config["jwks_uri"]
return requests.get(jwks_uri).json()

def verify(self, id_token, nonce=None):
"""验证ID Token"""
try:
payload = jwt.decode(
id_token,
self.jwks,
algorithms=["RS256"],
audience=self.client_id,
issuer=self.issuer,
)

# 验证nonce(防止重放攻击)
if nonce and payload.get("nonce") != nonce:
raise ValueError("Invalid nonce")

return payload

except JWTError as e:
raise ValueError(f"Invalid ID token: {e}")

UserInfo Endpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 获取用户信息
curl -H "Authorization: Bearer ACCESS_TOKEN" \
https://openidconnect.googleapis.com/v1/userinfo

# 响应
{
"sub": "110169484474386276334",
"name": "John Doe",
"given_name": "John",
"family_name": "Doe",
"picture": "https://example.com/photo.jpg",
"email": "[email protected]",
"email_verified": true,
"locale": "en"
}

OIDC Discovery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 标准发现端点
curl https://accounts.google.com/.well-known/openid-configuration | jq

{
"issuer": "https://accounts.google.com",
"authorization_endpoint": "https://accounts.google.com/o/oauth2/v2/auth",
"token_endpoint": "https://oauth2.googleapis.com/token",
"userinfo_endpoint": "https://openidconnect.googleapis.com/v1/userinfo",
"revocation_endpoint": "https://oauth2.googleapis.com/revoke",
"jwks_uri": "https://www.googleapis.com/oauth2/v3/certs",
"response_types_supported": ["code", "token", "id_token", ...],
"scopes_supported": ["openid", "email", "profile"],
"id_token_signing_alg_values_supported": ["RS256"]
}

Token Introspection

用于验证token是否有效(RFC 7662),常用于资源服务器。

sequenceDiagram
    participant C as Client
    participant RS as Resource Server
    participant AS as Auth Server

    C->>RS: GET /api/resource<br>Authorization: Bearer TOKEN

    RS->>AS: POST /introspect<br>token=TOKEN&<br>token_type_hint=access_token

    AS->>RS: {active: true, sub: "user123", scope: "read write", exp: 1723456789}

    alt token有效
        RS->>C: 200 OK + Resource
    else token无效
        RS->>C: 401 Unauthorized
    end

最佳实践

流程选择指南

flowchart TB
    START[选择OAuth流程] --> Q1{有用户参与?}
    Q1 -->|否| CC[Client Credentials]
    Q1 -->|是| Q2{客户端类型?}
    Q2 -->|Web后端| AC[Authorization Code]
    Q2 -->|SPA/Mobile| PKCE[Authorization Code + PKCE]
    Q2 -->|输入受限设备| DEVICE[Device Authorization]

    CC --> NOTE_CC[M2M通信<br>微服务间调用]
    AC --> NOTE_AC[最安全<br>可存储client_secret]
    PKCE --> NOTE_PKCE[不需要client_secret<br>防止授权码拦截]
    DEVICE --> NOTE_DEVICE[智能TV/CLI<br>用户在其他设备授权]

安全建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
security_best_practices:
tokens:
- "Access Token生命周期短 (5-15分钟)"
- "使用Refresh Token轮换 (Rotation)"
- "Refresh Token绑定设备/客户端"
- "Token存储在安全位置 (HttpOnly Cookie / Secure Storage)"

authorization:
- "始终验证state参数防止CSRF"
- "使用PKCE (即使是机密客户端也推荐)"
- "严格的redirect_uri白名单,精确匹配"
- "最小scope原则"

implementation:
- "使用成熟的OAuth库而非自己实现"
- "ID Token必须验证签名、issuer、audience、expiry"
- "不要在URL中传递token (用POST body或Header)"
- "实现token撤销机制"

总结

OAuth 2.0和OIDC是现代应用身份认证和授权的基石:

  1. Authorization Code + PKCE是最推荐的流程,即使是机密客户端也应使用PKCE
  2. Client Credentials适用于无用户参与的服务间通信
  3. OIDC在OAuth之上提供标准化的身份认证,ID Token携带用户身份信息
  4. Token管理是安全的关键——短生命周期、安全存储、及时轮换
  5. 永远不要自己实现OAuth——使用经过审计的成熟库

正确理解和实现OAuth/OIDC,是构建安全可靠的现代应用的必备技能。

作者 · authorzt
发布 · date2025-06-15
篇幅 · length3.2k 字 · 8 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论