MLOps (Machine Learning Operations) bridges the gap between ML development and production deployment. This guide covers essential practices for building robust ML systems.
What is MLOps?
MLOps applies DevOps principles to machine learning, focusing on:
- Reproducible ML pipelines
- Model versioning and registry
- Automated testing and validation
- Continuous training and deployment
- Model monitoring and governance
1. ML Pipeline Architecture
# Example MLOps pipeline with Kubeflow
from kfp import dsl
from kfp.components import create_component_from_func
@dsl.component
def preprocess_data(input_path: str, output_path: str):
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_csv(input_path)
scaler = StandardScaler()
df_scaled = pd.DataFrame(scaler.fit_transform(df), columns=df.columns)
df_scaled.to_csv(output_path, index=False)
@dsl.component
def train_model(data_path: str, model_path: str):
import joblib
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
df = pd.read_csv(data_path)
X, y = df.drop('target', axis=1), df['target']
model = RandomForestClassifier(n_estimators=100)
model.fit(X, y)
joblib.dump(model, model_path)
@dsl.pipeline(name='ml-training-pipeline')
def ml_pipeline():
preprocess_task = preprocess_data(input_path='raw_data.csv',
output_path='processed_data.csv')
train_task = train_model(data_path=preprocess_task.outputs['output_path'],
model_path='model.pkl')
2. Model Versioning with MLflow
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# Start MLflow run
with mlflow.start_run():
# Log parameters
mlflow.log_param("n_estimators", 100)
mlflow.log_param("max_depth", 10)
# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10)
model.fit(X_train, y_train)
# Log metrics
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
mlflow.log_metric("accuracy", accuracy)
# Log model
mlflow.sklearn.log_model(model, "random_forest_model")
print(f"Model logged with accuracy: {accuracy:.4f}")
3. CI/CD for Machine Learning
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
branches: [main]
schedule:
- cron: '0 0 * * 0' # Weekly retraining
jobs:
train-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run data validation
run: python scripts/validate_data.py
- name: Train model
run: python scripts/train.py
- name: Run model tests
run: pytest tests/test_model.py
- name: Deploy to staging
if: success()
run: python scripts/deploy.py --env staging
4. Model Monitoring
from evidently import ColumnMapping
from evidently.dashboard import Dashboard
from evidently.tabs import DataDriftTab, ModelPerformanceTab
def monitor_model(reference_data, production_data, predictions):
column_mapping = ColumnMapping()
column_mapping.target = 'target'
column_mapping.prediction = 'prediction'
dashboard = Dashboard(tabs=[
DataDriftTab(),
ModelPerformanceTab()
])
dashboard.calculate(reference_data, production_data, column_mapping)
dashboard.save("monitoring_report.html")
# Alert on significant drift
drift_score = calculate_drift_score(reference_data, production_data)
if drift_score > 0.3:
send_alert("Data drift detected! Score: {:.2f}".format(drift_score))
5. Feature Store with Feast
from feast import FeatureStore, Entity, Feature, FeatureView
from feast.types import Float64, Int64
# Define entity
customer = Entity(
name="customer_id",
join_key="customer_id",
description="Customer identifier"
)
# Define feature view
customer_features = FeatureView(
name="customer_features",
entities=[customer],
features=[
Feature(name="total_purchases", dtype=Float64),
Feature(name="avg_order_value", dtype=Float64),
Feature(name="days_since_last_order", dtype=Int64),
],
online=True,
source=customer_data_source,
)
# Get features for inference
store = FeatureStore(repo_path="feature_repo/")
features = store.get_online_features(
features=["customer_features:total_purchases",
"customer_features:avg_order_value"],
entity_rows=[{"customer_id": 12345}]
)
MLOps Tools Landscape
| Category | Tools |
|---|---|
| Experiment Tracking | MLflow, Weights & Biases, Neptune |
| Pipeline Orchestration | Kubeflow, Airflow, Prefect, Dagster |
| Feature Store | Feast, Tecton, Hopsworks |
| Model Serving | TensorFlow Serving, Seldon, BentoML |
| Monitoring | Evidently, Arize, WhyLabs |
Implementing MLOps practices ensures your ML models are reliable, reproducible, and production-ready!