graph TB
subgraph "数据层 Data Layer"
A[数据源] --> B[数据管道<br/>ETL/ELT]
B --> C[Feature Store<br/>特征存储]
end
subgraph "实验层 Experiment Layer"
C --> D[特征工程]
D --> E[模型训练]
E --> F[实验追踪<br/>MLflow]
F --> G[模型评估]
end
subgraph "部署层 Deployment Layer"
G --> H[模型注册<br/>Model Registry]
H --> I{部署方式}
I --> J[在线推理<br/>REST/gRPC]
I --> K[批量推理<br/>Spark/Airflow]
I --> L[边缘推理<br/>TFLite/ONNX]
end
subgraph "监控层 Monitoring Layer"
J --> M[性能监控]
J --> N[数据漂移检测]
J --> O[模型漂移检测]
M --> P[告警 & 重训练触发]
N --> P
O --> P
P --> D
end
style C fill:#3498db,color:#fff
style F fill:#e74c3c,color:#fff
style H fill:#2ecc71,color:#fff
style M fill:#f39c12,color:#000
特征工程与 Feature Store
Feature Store
统一管理特征定义、计算和服务,避免训练和推理的特征不一致(Training-Serving
Skew):
graph LR
subgraph "Feature Store"
A[Feature Definition<br/>特征定义 DSL] --> B[Offline Store<br/>批量特征 / 历史]
A --> C[Online Store<br/>实时特征 / Redis]
D[Feature Transform<br/>特征计算] --> B
D --> C
end
E[训练管道] --> B
F[在线推理] --> C
style B fill:#3498db,color:#fff
style C fill:#e74c3c,color:#fff
# Transition model stage client.transition_model_version_stage( name="fraud-detector", version=3, stage="Production", archive_existing_versions=True, # Archive previous production version )
# Load production model model = mlflow.pyfunc.load_model("models:/fraud-detector/Production") predictions = model.predict(new_data)
graph TB
A[Client Requests] --> B[Triton Inference Server]
B --> C[Model Repository]
C --> C1[PyTorch Model<br/>.pt]
C --> C2[ONNX Model<br/>.onnx]
C --> C3[TensorRT Model<br/>.plan]
C --> C4[Python Backend<br/>.py]
B --> D[Features]
D --> D1[Dynamic Batching]
D --> D2[Model Ensemble]
D --> D3[Concurrent Execution]
D --> D4[GPU Scheduling]
style B fill:#76b900,color:#fff
style D1 fill:#3498db,color:#fff
@app.get("/health") asyncdefhealth(): return {"status": "healthy", "model_loaded": model isnotNone}
监控与漂移检测
graph TB
A[模型监控维度] --> B[性能监控]
A --> C[数据漂移]
A --> D[概念漂移]
A --> E[系统监控]
B --> B1[准确率/F1趋势]
B --> B2[延迟 P50/P99]
B --> B3[吞吐量 QPS]
C --> C1[输入特征分布变化]
C --> C2[PSI / KS Test]
C --> C3[特征缺失率]
D --> D1[预测分布变化]
D --> D2[标签与预测的偏差]
E --> E1[GPU利用率]
E --> E2[内存使用]
E --> E3[错误率]
style C fill:#e74c3c,color:#fff
style D fill:#f39c12,color:#000
# Usage drift_results = detect_drift_ks(training_data, last_week_data) for feature, result in drift_results.items(): if result["is_drifted"]: print(f"DRIFT DETECTED in {feature}: p-value={result['p_value']:.4f}")
# Save report report.save_html("drift_report.html")
# Get results programmatically results = report.as_dict() dataset_drift = results["metrics"][0]["result"]["dataset_drift"] if dataset_drift: print("Dataset drift detected! Consider retraining.")
CI/CD for ML
graph LR
A[Code Push] --> B[CI Pipeline]
B --> C[Data Validation]
C --> D[Unit Tests]
D --> E[Model Training]
E --> F[Model Evaluation]
F --> G{Quality Gate}
G -->|Pass| H[Model Registry]
G -->|Fail| I[Notify & Block]
H --> J[Staging Deploy]
J --> K[A/B Test / Shadow]
K --> L{Performance OK?}
L -->|Yes| M[Production Deploy]
L -->|No| I
style G fill:#f39c12,color:#000
style L fill:#f39c12,color:#000
style M fill:#2ecc71,color:#fff