AI · #mlops#pipeline#model-serving

MLOps流水线设计与实现

2025.10.29 7 min 2.7k
// 目录 · contents

引言

MLOps(Machine Learning Operations)将 DevOps 的理念引入机器学习领域,解决模型从实验到生产的全生命周期管理问题。一个成熟的 MLOps 流水线能够实现:可复现的实验、自动化的训练、可靠的部署和持续的监控。本文将系统介绍 MLOps 各环节的设计与实现,涵盖特征存储、实验追踪(MLflow)、模型注册、在线服务(TFServing/Triton)、监控和漂移检测。

MLOps 全景架构

graph TB
    subgraph "数据层 Data Layer"
        A[数据源] --> B[数据管道<br/>ETL/ELT]
        B --> C[Feature Store<br/>特征存储]
    end

    subgraph "实验层 Experiment Layer"
        C --> D[特征工程]
        D --> E[模型训练]
        E --> F[实验追踪<br/>MLflow]
        F --> G[模型评估]
    end

    subgraph "部署层 Deployment Layer"
        G --> H[模型注册<br/>Model Registry]
        H --> I{部署方式}
        I --> J[在线推理<br/>REST/gRPC]
        I --> K[批量推理<br/>Spark/Airflow]
        I --> L[边缘推理<br/>TFLite/ONNX]
    end

    subgraph "监控层 Monitoring Layer"
        J --> M[性能监控]
        J --> N[数据漂移检测]
        J --> O[模型漂移检测]
        M --> P[告警 & 重训练触发]
        N --> P
        O --> P
        P --> D
    end

    style C fill:#3498db,color:#fff
    style F fill:#e74c3c,color:#fff
    style H fill:#2ecc71,color:#fff
    style M fill:#f39c12,color:#000

特征工程与 Feature Store

Feature Store 统一管理特征定义、计算和服务,避免训练和推理的特征不一致(Training-Serving Skew):

graph LR
    subgraph "Feature Store"
        A[Feature Definition<br/>特征定义 DSL] --> B[Offline Store<br/>批量特征 / 历史]
        A --> C[Online Store<br/>实时特征 / Redis]
        D[Feature Transform<br/>特征计算] --> B
        D --> C
    end

    E[训练管道] --> B
    F[在线推理] --> C

    style B fill:#3498db,color:#fff
    style C fill:#e74c3c,color:#fff

使用 Feast

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# feature_store/definitions.py
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from datetime import timedelta

# Define entity
user = Entity(
name="user_id",
value_type=ValueType.INT64,
description="User identifier",
)

# Define feature source
user_features_source = FileSource(
path="data/user_features.parquet",
timestamp_field="event_timestamp",
)

# Define feature view
user_features = FeatureView(
name="user_features",
entities=[user],
ttl=timedelta(days=1),
schema=[
Feature(name="total_purchases", dtype=ValueType.INT64),
Feature(name="avg_order_value", dtype=ValueType.FLOAT),
Feature(name="days_since_last_order", dtype=ValueType.INT64),
Feature(name="preferred_category", dtype=ValueType.STRING),
],
source=user_features_source,
online=True,
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Training: get historical features
from feast import FeatureStore

store = FeatureStore(repo_path="./feature_store")

# Point-in-time correct feature retrieval
training_df = store.get_historical_features(
entity_df=entity_df, # DataFrame with user_id and event_timestamp
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
"user_features:days_since_last_order",
],
).to_df()

# Serving: get online features
online_features = store.get_online_features(
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
],
entity_rows=[{"user_id": 12345}],
).to_dict()

实验追踪:MLflow

MLflow 是最流行的 ML 实验追踪工具,支持参数、指标、模型和数据集的版本管理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# Set tracking URI
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("fraud-detection")

# Training with experiment tracking
with mlflow.start_run(run_name="rf-v2-tuned") as run:
# Log parameters
params = {
"n_estimators": 200,
"max_depth": 15,
"min_samples_split": 5,
"class_weight": "balanced",
}
mlflow.log_params(params)

# Train model
model = RandomForestClassifier(**params, random_state=42)
model.fit(X_train, y_train)

# Evaluate
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1": f1_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
}
mlflow.log_metrics(metrics)

# Log model with signature
from mlflow.models import infer_signature
signature = infer_signature(X_test, y_pred)
mlflow.sklearn.log_model(
model,
artifact_path="model",
signature=signature,
registered_model_name="fraud-detector",
)

# Log additional artifacts
mlflow.log_artifact("feature_importance.png")
mlflow.log_dict({"features": feature_names}, "feature_config.json")

# Log dataset info
mlflow.log_input(
mlflow.data.from_pandas(X_train, name="training_data"),
context="training",
)

print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")

MLflow Model Registry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from mlflow.tracking import MlflowClient

client = MlflowClient()

# Transition model stage
client.transition_model_version_stage(
name="fraud-detector",
version=3,
stage="Production",
archive_existing_versions=True, # Archive previous production version
)

# Load production model
model = mlflow.pyfunc.load_model("models:/fraud-detector/Production")
predictions = model.predict(new_data)

# Compare model versions
runs = mlflow.search_runs(
experiment_ids=["1"],
filter_string="metrics.f1 > 0.85",
order_by=["metrics.f1 DESC"],
max_results=5,
)
print(runs[["run_id", "params.n_estimators", "metrics.f1"]])

模型服务(Model Serving)

TensorFlow Serving

1
2
3
4
5
# Pull and run TF Serving Docker image
docker run -p 8501:8501 \
--mount type=bind,source=/models/fraud_detector,target=/models/fraud_detector \
-e MODEL_NAME=fraud_detector \
tensorflow/serving
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Client request
import requests
import json

data = {
"instances": [
{"feature1": 0.5, "feature2": 1.2, "feature3": -0.3},
{"feature1": 0.8, "feature2": 0.1, "feature3": 0.6},
]
}

response = requests.post(
"http://localhost:8501/v1/models/fraud_detector:predict",
json=data,
)

predictions = response.json()["predictions"]

NVIDIA Triton Inference Server

graph TB
    A[Client Requests] --> B[Triton Inference Server]

    B --> C[Model Repository]
    C --> C1[PyTorch Model<br/>.pt]
    C --> C2[ONNX Model<br/>.onnx]
    C --> C3[TensorRT Model<br/>.plan]
    C --> C4[Python Backend<br/>.py]

    B --> D[Features]
    D --> D1[Dynamic Batching]
    D --> D2[Model Ensemble]
    D --> D3[Concurrent Execution]
    D --> D4[GPU Scheduling]

    style B fill:#76b900,color:#fff
    style D1 fill:#3498db,color:#fff
1
2
3
4
5
6
7
8
9
10
11
12
# Triton model repository structure
models/
├── fraud_detector/
│ ├── config.pbtxt
│ ├── 1/
│ │ └── model.onnx
│ └── 2/
│ └── model.onnx
└── text_encoder/
├── config.pbtxt
└── 1/
└── model.pt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# config.pbtxt
name: "fraud_detector"
platform: "onnxruntime_onnx"
max_batch_size: 64

input [
{
name: "features"
data_type: TYPE_FP32
dims: [ 20 ]
}
]

output [
{
name: "prediction"
data_type: TYPE_FP32
dims: [ 1 ]
}
]

dynamic_batching {
preferred_batch_size: [ 16, 32 ]
max_queue_delay_microseconds: 100
}

instance_group [
{
count: 2
kind: KIND_GPU
gpus: [ 0 ]
}
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Triton client
import tritonclient.http as httpclient
import numpy as np

client = httpclient.InferenceServerClient(url="localhost:8000")

# Prepare input
input_data = np.array([[0.5, 1.2, -0.3, ...]], dtype=np.float32)
inputs = [httpclient.InferInput("features", input_data.shape, "FP32")]
inputs[0].set_data_from_numpy(input_data)

# Inference
result = client.infer(model_name="fraud_detector", inputs=inputs)
prediction = result.as_numpy("prediction")
print(f"Prediction: {prediction}")

FastAPI 自定义服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import numpy as np

app = FastAPI(title="ML Model Serving")

# Load model at startup
model = None

@app.on_event("startup")
async def load_model():
global model
model = mlflow.pyfunc.load_model("models:/fraud-detector/Production")

class PredictionRequest(BaseModel):
features: list[list[float]]

class PredictionResponse(BaseModel):
predictions: list[float]
model_version: str

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
input_data = np.array(request.features)
predictions = model.predict(input_data).tolist()
return PredictionResponse(
predictions=predictions,
model_version="3",
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health():
return {"status": "healthy", "model_loaded": model is not None}

监控与漂移检测

graph TB
    A[模型监控维度] --> B[性能监控]
    A --> C[数据漂移]
    A --> D[概念漂移]
    A --> E[系统监控]

    B --> B1[准确率/F1趋势]
    B --> B2[延迟 P50/P99]
    B --> B3[吞吐量 QPS]

    C --> C1[输入特征分布变化]
    C --> C2[PSI / KS Test]
    C --> C3[特征缺失率]

    D --> D1[预测分布变化]
    D --> D2[标签与预测的偏差]

    E --> E1[GPU利用率]
    E --> E2[内存使用]
    E --> E3[错误率]

    style C fill:#e74c3c,color:#fff
    style D fill:#f39c12,color:#000

数据漂移检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from scipy import stats
import numpy as np

def detect_drift_ks(reference_data, production_data, threshold=0.05):
"""
Kolmogorov-Smirnov test for data drift detection.
"""
drift_results = {}

for feature in reference_data.columns:
statistic, p_value = stats.ks_2samp(
reference_data[feature],
production_data[feature],
)
is_drifted = p_value < threshold
drift_results[feature] = {
"statistic": statistic,
"p_value": p_value,
"is_drifted": is_drifted,
}

return drift_results

def calculate_psi(expected, actual, bins=10):
"""
Population Stability Index (PSI) for drift detection.

PSI < 0.1: No drift
0.1 <= PSI < 0.2: Moderate drift
PSI >= 0.2: Significant drift
"""
expected_percents = np.histogram(expected, bins=bins)[0] / len(expected)
actual_percents = np.histogram(actual, bins=bins)[0] / len(actual)

# Avoid division by zero
expected_percents = np.clip(expected_percents, 0.001, None)
actual_percents = np.clip(actual_percents, 0.001, None)

psi = np.sum(
(actual_percents - expected_percents) *
np.log(actual_percents / expected_percents)
)

return psi

# Usage
drift_results = detect_drift_ks(training_data, last_week_data)
for feature, result in drift_results.items():
if result["is_drifted"]:
print(f"DRIFT DETECTED in {feature}: p-value={result['p_value']:.4f}")

使用 Evidently 进行监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ClassificationQualityMetric,
)

# Define column mapping
column_mapping = ColumnMapping(
target="label",
prediction="prediction",
numerical_features=["feature_1", "feature_2", "feature_3"],
categorical_features=["category"],
)

# Create drift report
report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
ClassificationQualityMetric(),
])

report.run(
reference_data=training_data,
current_data=production_data,
column_mapping=column_mapping,
)

# Save report
report.save_html("drift_report.html")

# Get results programmatically
results = report.as_dict()
dataset_drift = results["metrics"][0]["result"]["dataset_drift"]
if dataset_drift:
print("Dataset drift detected! Consider retraining.")

CI/CD for ML

graph LR
    A[Code Push] --> B[CI Pipeline]
    B --> C[Data Validation]
    C --> D[Unit Tests]
    D --> E[Model Training]
    E --> F[Model Evaluation]
    F --> G{Quality Gate}
    G -->|Pass| H[Model Registry]
    G -->|Fail| I[Notify & Block]
    H --> J[Staging Deploy]
    J --> K[A/B Test / Shadow]
    K --> L{Performance OK?}
    L -->|Yes| M[Production Deploy]
    L -->|No| I

    style G fill:#f39c12,color:#000
    style L fill:#f39c12,color:#000
    style M fill:#2ecc71,color:#fff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
push:
paths:
- 'models/**'
- 'data/**'
- 'training/**'

jobs:
train-and-evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'

- name: Install dependencies
run: pip install -r requirements.txt

- name: Validate data
run: python scripts/validate_data.py

- name: Run unit tests
run: pytest tests/ -v

- name: Train model
run: python training/train.py --config configs/production.yaml
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}

- name: Evaluate model
run: |
python training/evaluate.py \
--model-uri "runs:/${{ env.RUN_ID }}/model" \
--min-f1 0.85 \
--min-accuracy 0.90

- name: Register model
if: success()
run: |
python scripts/register_model.py \
--run-id ${{ env.RUN_ID }} \
--stage Staging

总结

MLOps 的核心目标是让机器学习系统可复现、可靠和可维护。关键实践包括:

  1. Feature Store:统一训练和推理的特征管道,避免 Training-Serving Skew
  2. 实验追踪:使用 MLflow 记录每次实验的参数、指标和产物
  3. 模型注册:版本化管理模型,支持回滚和 A/B 测试
  4. 自动化部署:CI/CD 流水线包含数据验证、模型评估的质量门
  5. 持续监控:检测数据漂移和模型性能退化,自动触发重训练
  6. Triton/TFServing:生产级模型服务,支持动态批处理和多模型管理

从小规模开始,先做好实验追踪和模型版本管理,再逐步添加 Feature Store、漂移检测和自动化重训练。

作者 · authorzt
发布 · date2025-10-29
篇幅 · length2.7k 字 · 7 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论