Published on

MLOps Kubernetes | Enterprise Machine Learning

Authors

MLOps mit Kubernetes: Die Enterprise-ML-Revolution

MLOps mit Kubernetes transformiert die Art, wie Unternehmen Machine Learning entwickeln, deployen und verwalten. Die Kombination aus Kubernetes-Orchestrierung und ML-spezifischen Workflows schafft eine skalierbare, automatisierte Plattform für Enterprise Machine Learning.

Warum MLOps mit Kubernetes?

  • +70% schnellere ML-Model-Entwicklung
  • +85% höhere Deployment-Sicherheit
  • +60% Kosteneinsparungen durch optimierte Ressourcennutzung
  • 100% Reproduzierbarkeit und Compliance

Enterprise-Vorteile:

  • Automatisierte ML-Pipelines von Entwicklung bis Production
  • Skalierbare Infrastruktur für große ML-Workloads
  • Multi-Tenant-Support für verschiedene Teams
  • Governance und Compliance für kritische ML-Anwendungen

MLOps Architektur mit Kubernetes

Kubernetes-basierte MLOps-Stack

## mlops-kubernetes-architecture.yaml
mlops_platform:
  orchestration:
    kubernetes: 'Container-Orchestrierung'
    kubeflow: 'ML-Workflow-Management'
    argo_workflows: 'Pipeline-Orchestrierung'

  model_management:
    mlflow: 'Experiment-Tracking & Model Registry'
    kubeflow_metadata: 'ML-Metadaten-Management'
    model_versioning: 'Git-basierte Versionierung'

  pipeline_automation:
    tekton: 'CI/CD für ML-Pipelines'
    kubeflow_pipelines: 'ML-spezifische Workflows'
    argo_events: 'Event-driven Triggers'

  monitoring_observability:
    prometheus: 'Metriken-Sammlung'
    grafana: 'Visualisierung'
    kubeflow_katib: 'Hyperparameter-Optimierung'
    model_monitoring: 'Production-Model-Überwachung'

  storage_data:
    minio: 'Object Storage für ML-Artifacts'
    postgresql: 'Metadaten-Datenbank'
    redis: 'Caching und Session-Management'
    elasticsearch: 'Log-Aggregation'

Kubeflow Integration für MLOps

## kubeflow_mlops_setup.py
import kfp
from kfp import dsl
from kfp.components import create_component_from_func
import mlflow
import kubernetes

class KubeflowMLOps:
    def __init__(self, namespace="kubeflow"):
        self.namespace = namespace
        self.client = kfp.Client()

    def setup_mlops_environment(self):
        """MLOps-Umgebung in Kubernetes einrichten"""

        # Kubeflow Namespace erstellen
        namespace = kubernetes.client.V1Namespace(
            metadata=kubernetes.client.V1ObjectMeta(name=self.namespace)
        )

        # Kubeflow-Komponenten installieren
        kubeflow_components = [
            "kubeflow-pipelines",
            "kubeflow-metadata",
            "kubeflow-katib",
            "kubeflow-serving"
        ]

        for component in kubeflow_components:
            self.install_kubeflow_component(component)

    def create_ml_pipeline(self, pipeline_name, steps):
        """ML-Pipeline mit Kubeflow erstellen"""

        @dsl.pipeline(
            name=pipeline_name,
            description="Enterprise ML-Pipeline mit Kubeflow"
        )
        def ml_pipeline():
            # Daten-Vorverarbeitung
            data_preprocessing = self.create_preprocessing_step()

            # Feature Engineering
            feature_engineering = self.create_feature_engineering_step()
            feature_engineering.after(data_preprocessing)

            # Modell-Training
            model_training = self.create_training_step()
            model_training.after(feature_engineering)

            # Modell-Evaluation
            model_evaluation = self.create_evaluation_step()
            model_evaluation.after(model_training)

            # Modell-Deployment (nur bei guter Performance)
            with dsl.Condition(model_evaluation.outputs['accuracy'] > 0.8):
                model_deployment = self.create_deployment_step()
                model_deployment.after(model_evaluation)

        return ml_pipeline

    def create_preprocessing_step(self):
        """Daten-Vorverarbeitung Step"""
        return create_component_from_func(
            func=self.preprocess_data,
            base_image="python:3.9-slim",
            packages_to_install=["pandas", "scikit-learn", "numpy"]
        )

    def create_feature_engineering_step(self):
        """Feature Engineering Step"""
        return create_component_from_func(
            func=self.engineer_features,
            base_image="python:3.9-slim",
            packages_to_install=["pandas", "scikit-learn", "feature-engine"]
        )

    def create_training_step(self):
        """Modell-Training Step"""
        return create_component_from_func(
            func=self.train_model,
            base_image="python:3.9-slim",
            packages_to_install=["scikit-learn", "mlflow", "xgboost"]
        )

    def create_evaluation_step(self):
        """Modell-Evaluation Step"""
        return create_component_from_func(
            func=self.evaluate_model,
            base_image="python:3.9-slim",
            packages_to_install=["scikit-learn", "mlflow", "pandas"]
        )

    def create_deployment_step(self):
        """Modell-Deployment Step"""
        return create_component_from_func(
            func=self.deploy_model,
            base_image="python:3.9-slim",
            packages_to_install=["kubernetes", "mlflow"]
        )

    # Pipeline-Funktionen
    def preprocess_data(self, input_data_path: str, output_data_path: str):
        """Daten-Vorverarbeitung"""
        import pandas as pd
        from sklearn.preprocessing import StandardScaler

        # Daten laden
        data = pd.read_csv(input_data_path)

        # Fehlende Werte behandeln
        data = data.fillna(data.mean())

        # Kategorische Variablen encodieren
        categorical_columns = data.select_dtypes(include=['object']).columns
        data = pd.get_dummies(data, columns=categorical_columns)

        # Skalierung
        scaler = StandardScaler()
        numerical_columns = data.select_dtypes(include=['float64', 'int64']).columns
        data[numerical_columns] = scaler.fit_transform(data[numerical_columns])

        # Verarbeitete Daten speichern
        data.to_csv(output_data_path, index=False)

        return output_data_path

    def engineer_features(self, input_data_path: str, output_data_path: str):
        """Feature Engineering"""
        import pandas as pd
        import numpy as np

        data = pd.read_csv(input_data_path)

        # Neue Features erstellen
        # Beispiel: Interaktionen zwischen numerischen Features
        numerical_columns = data.select_dtypes(include=['float64', 'int64']).columns

        for i, col1 in enumerate(numerical_columns):
            for col2 in numerical_columns[i+1:]:
                interaction_name = f"{col1}_{col2}_interaction"
                data[interaction_name] = data[col1] * data[col2]

        # Polynom-Features für wichtige Variablen
        important_features = numerical_columns[:3]  # Top 3 Features
        for feature in important_features:
            data[f"{feature}_squared"] = data[feature] ** 2

        data.to_csv(output_data_path, index=False)
        return output_data_path

    def train_model(self, input_data_path: str, model_output_path: str):
        """Modell-Training mit MLflow"""
        import pandas as pd
        from sklearn.model_selection import train_test_split
        from sklearn.ensemble import RandomForestClassifier
        import mlflow
        import mlflow.sklearn
        import joblib

        # MLflow konfigurieren
        mlflow.set_tracking_uri("http://mlflow-service:5000")

        # Daten laden
        data = pd.read_csv(input_data_path)

        # Features und Target trennen
        X = data.drop('target', axis=1)
        y = data['target']

        # Train/Test Split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # MLflow Experiment starten
        with mlflow.start_run():
            # Modell trainieren
            model = RandomForestClassifier(n_estimators=100, random_state=42)
            model.fit(X_train, y_train)

            # Metriken loggen
            train_score = model.score(X_train, y_train)
            test_score = model.score(X_test, y_test)

            mlflow.log_metric("train_accuracy", train_score)
            mlflow.log_metric("test_accuracy", test_score)
            mlflow.log_param("n_estimators", 100)

            # Modell speichern
            mlflow.sklearn.log_model(model, "model")

            # Modell für Pipeline speichern
            joblib.dump(model, model_output_path)

        return model_output_path

    def evaluate_model(self, model_path: str, test_data_path: str) -> float:
        """Modell-Evaluation"""
        import joblib
        import pandas as pd
        from sklearn.metrics import accuracy_score, classification_report
        import mlflow

        # Modell laden
        model = joblib.load(model_path)

        # Test-Daten laden
        test_data = pd.read_csv(test_data_path)
        X_test = test_data.drop('target', axis=1)
        y_test = test_data['target']

        # Vorhersagen
        y_pred = model.predict(X_test)

        # Metriken berechnen
        accuracy = accuracy_score(y_test, y_pred)

        # MLflow Metriken loggen
        with mlflow.start_run():
            mlflow.log_metric("final_accuracy", accuracy)
            mlflow.log_metric("model_performance", accuracy)

        return accuracy

    def deploy_model(self, model_path: str, deployment_name: str):
        """Modell-Deployment in Kubernetes"""
        import kubernetes
        from kubernetes import client, config
        import mlflow

        # Kubernetes konfigurieren
        config.load_incluster_config()

        # MLflow Model Registry
        mlflow.set_tracking_uri("http://mlflow-service:5000")

        # Modell in Registry registrieren
        model_uri = f"runs:/{mlflow.active_run().info.run_id}/model"
        registered_model = mlflow.register_model(
            model_uri=model_uri,
            name=deployment_name
        )

        # Kubernetes Deployment erstellen
        deployment = client.V1Deployment(
            metadata=client.V1ObjectMeta(name=f"{deployment_name}-deployment"),
            spec=client.V1DeploymentSpec(
                replicas=3,
                selector=client.V1LabelSelector(
                    match_labels={"app": deployment_name}
                ),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(
                        labels={"app": deployment_name}
                    ),
                    spec=client.V1PodSpec(
                        containers=[
                            client.V1Container(
                                name=deployment_name,
                                image="mlflow-model-serving:latest",
                                ports=[client.V1ContainerPort(container_port=8080)],
                                env=[
                                    client.V1EnvVar(
                                        name="MODEL_URI",
                                        value=f"models:/{deployment_name}/latest"
                                    )
                                ]
                            )
                        ]
                    )
                )
            )
        )

        # Deployment anwenden
        apps_v1 = client.AppsV1Api()
        apps_v1.create_namespaced_deployment(
            namespace=self.namespace,
            body=deployment
        )

        return f"Model {deployment_name} deployed successfully"

MLflow Integration für Experiment Tracking

MLflow mit Kubernetes

## mlflow_kubernetes_integration.py
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
import kubernetes
from kubernetes import client, config

class MLflowKubernetesIntegration:
    def __init__(self, tracking_uri="http://mlflow-service:5000"):
        self.tracking_uri = tracking_uri
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()

        # Kubernetes konfigurieren
        config.load_incluster_config()

    def setup_mlflow_in_kubernetes(self):
        """MLflow in Kubernetes einrichten"""

        # MLflow Service erstellen
        service = client.V1Service(
            metadata=client.V1ObjectMeta(name="mlflow-service"),
            spec=client.V1ServiceSpec(
                selector={"app": "mlflow"},
                ports=[client.V1ServicePort(port=5000, target_port=5000)]
            )
        )

        # MLflow Deployment
        deployment = client.V1Deployment(
            metadata=client.V1ObjectMeta(name="mlflow-deployment"),
            spec=client.V1DeploymentSpec(
                replicas=1,
                selector=client.V1LabelSelector(match_labels={"app": "mlflow"}),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(labels={"app": "mlflow"}),
                    spec=client.V1PodSpec(
                        containers=[
                            client.V1Container(
                                name="mlflow",
                                image="mlflow:latest",
                                ports=[client.V1ContainerPort(container_port=5000)],
                                env=[
                                    client.V1EnvVar(name="MLFLOW_TRACKING_URI", value="sqlite:///mlflow.db"),
                                    client.V1EnvVar(name="MLFLOW_DEFAULT_ARTIFACT_ROOT", value="s3://mlflow-artifacts")
                                ]
                            )
                        ]
                    )
                )
            )
        )

        # Services anwenden
        core_v1 = client.CoreV1Api()
        apps_v1 = client.AppsV1Api()

        core_v1.create_namespaced_service(namespace="kubeflow", body=service)
        apps_v1.create_namespaced_deployment(namespace="kubeflow", body=deployment)

    def create_experiment(self, experiment_name: str, description: str = ""):
        """MLflow Experiment erstellen"""
        experiment = self.client.create_experiment(
            name=experiment_name,
            description=description
        )
        return experiment

    def log_model_experiment(self, experiment_name: str, model, metrics: dict, params: dict):
        """Modell-Experiment in MLflow loggen"""
        mlflow.set_experiment(experiment_name)

        with mlflow.start_run():
            # Parameter loggen
            for key, value in params.items():
                mlflow.log_param(key, value)

            # Metriken loggen
            for key, value in metrics.items():
                mlflow.log_metric(key, value)

            # Modell loggen
            mlflow.sklearn.log_model(model, "model")

            # Artifacts loggen
            mlflow.log_artifact("model_performance_report.html")

    def register_model(self, model_name: str, model_version: str, run_id: str):
        """Modell in MLflow Registry registrieren"""
        model_uri = f"runs:/{run_id}/model"

        registered_model = mlflow.register_model(
            model_uri=model_uri,
            name=model_name
        )

        # Model-Staging
        self.client.transition_model_version_stage(
            name=model_name,
            version=model_version,
            stage="Staging"
        )

        return registered_model

    def promote_model_to_production(self, model_name: str, model_version: str):
        """Modell zu Production promoten"""
        self.client.transition_model_version_stage(
            name=model_name,
            version=model_version,
            stage="Production"
        )

        # Kubernetes Deployment aktualisieren
        self.update_production_deployment(model_name, model_version)

    def update_production_deployment(self, model_name: str, model_version: str):
        """Production-Deployment aktualisieren"""
        apps_v1 = client.AppsV1Api()

        # Deployment laden
        deployment = apps_v1.read_namespaced_deployment(
            name=f"{model_name}-deployment",
            namespace="kubeflow"
        )

        # Model-Version aktualisieren
        deployment.spec.template.spec.containers[0].env = [
            client.V1EnvVar(
                name="MODEL_URI",
                value=f"models:/{model_name}/{model_version}"
            )
        ]

        # Deployment aktualisieren
        apps_v1.patch_namespaced_deployment(
            name=f"{model_name}-deployment",
            namespace="kubeflow",
            body=deployment
        )

Automatisierte ML-Pipelines mit Tekton

CI/CD für Machine Learning

## tekton-ml-pipeline.yaml
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
  name: ml-pipeline
spec:
  params:
    - name: git-url
    - name: git-revision
    - name: model-name
    - name: experiment-name

  workspaces:
    - name: shared-workspace

  tasks:
    - name: fetch-repository
      taskRef:
        name: git-clone
      workspaces:
        - name: output
          workspace: shared-workspace
      params:
        - name: url
          value: $(params.git-url)
        - name: revision
          value: $(params.git-revision)

    - name: run-tests
      runAfter: ['fetch-repository']
      taskRef:
        name: python-test
      workspaces:
        - name: source
          workspace: shared-workspace
      params:
        - name: args
          value: ['-m', 'pytest', 'tests/']

    - name: train-model
      runAfter: ['run-tests']
      taskRef:
        name: mlflow-train
      workspaces:
        - name: source
          workspace: shared-workspace
      params:
        - name: experiment-name
          value: $(params.experiment-name)
        - name: model-name
          value: $(params.model-name)

    - name: evaluate-model
      runAfter: ['train-model']
      taskRef:
        name: mlflow-evaluate
      workspaces:
        - name: source
          workspace: shared-workspace
      params:
        - name: model-name
          value: $(params.model-name)

    - name: deploy-model
      runAfter: ['evaluate-model']
      taskRef:
        name: kubernetes-deploy
      workspaces:
        - name: source
          workspace: shared-workspace
      params:
        - name: model-name
          value: $(params.model-name)
      when:
        - input: '$(tasks.evaluate-model.results.accuracy)'
          operator: in
          values: ['0.8', '0.9', '1.0']
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: mlflow-train
spec:
  params:
    - name: experiment-name
    - name: model-name
  steps:
    - name: train
      image: python:3.9-slim
      script: |
        pip install mlflow scikit-learn pandas numpy
        python train.py --experiment $(params.experiment-name) --model $(params.model-name)
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: mlflow-evaluate
spec:
  params:
    - name: model-name
  results:
    - name: accuracy
  steps:
    - name: evaluate
      image: python:3.9-slim
      script: |
        pip install mlflow scikit-learn
        accuracy=$(python evaluate.py --model $(params.model-name))
        echo $accuracy > $(results.accuracy.path)
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: kubernetes-deploy
spec:
  params:
    - name: model-name
  steps:
    - name: deploy
      image: bitnami/kubectl:latest
      script: |
        kubectl apply -f k8s/$(params.model-name)-deployment.yaml

Model Monitoring und Observability

Production-Model-Überwachung

## model_monitoring.py
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
import mlflow
import kubernetes
from kubernetes import client, config
import numpy as np
import pandas as pd
from datetime import datetime, timedelta

class ModelMonitoring:
    def __init__(self, model_name: str):
        self.model_name = model_name

        # Prometheus Metriken
        self.prediction_counter = Counter(
            'model_predictions_total',
            'Total number of model predictions',
            ['model_name', 'status']
        )

        self.prediction_latency = Histogram(
            'model_prediction_duration_seconds',
            'Model prediction latency',
            ['model_name']
        )

        self.model_accuracy = Gauge(
            'model_accuracy',
            'Model accuracy over time',
            ['model_name']
        )

        self.data_drift_score = Gauge(
            'data_drift_score',
            'Data drift detection score',
            ['model_name']
        )

        # MLflow konfigurieren
        mlflow.set_tracking_uri("http://mlflow-service:5000")

        # Kubernetes konfigurieren
        config.load_incluster_config()

    def monitor_prediction(self, input_data, prediction, actual=None):
        """Einzelne Vorhersage überwachen"""
        import time
        start_time = time.time()

        try:
            # Vorhersage durchführen
            result = self.make_prediction(input_data)

            # Latenz messen
            latency = time.time() - start_time
            self.prediction_latency.labels(model_name=self.model_name).observe(latency)

            # Counter erhöhen
            self.prediction_counter.labels(
                model_name=self.model_name,
                status="success"
            ).inc()

            # Accuracy berechnen (falls Ground Truth verfügbar)
            if actual is not None:
                accuracy = 1.0 if prediction == actual else 0.0
                self.model_accuracy.labels(model_name=self.model_name).set(accuracy)

            return result

        except Exception as e:
            # Fehler zählen
            self.prediction_counter.labels(
                model_name=self.model_name,
                status="error"
            ).inc()
            raise e

    def detect_data_drift(self, current_data: pd.DataFrame, reference_data: pd.DataFrame):
        """Data Drift Detection"""
        from scipy import stats

        drift_scores = {}

        for column in current_data.columns:
            if column in reference_data.columns:
                # Kolmogorov-Smirnov Test für numerische Features
                if current_data[column].dtype in ['float64', 'int64']:
                    ks_statistic, p_value = stats.ks_2samp(
                        current_data[column],
                        reference_data[column]
                    )
                    drift_scores[column] = p_value

        # Gesamtdrift-Score
        overall_drift = np.mean(list(drift_scores.values()))
        self.data_drift_score.labels(model_name=self.model_name).set(overall_drift)

        return drift_scores, overall_drift

    def create_monitoring_dashboard(self):
        """Grafana Dashboard für Model-Monitoring erstellen"""
        dashboard_config = {
            "dashboard": {
                "title": f"Model Monitoring - {self.model_name}",
                "panels": [
                    {
                        "title": "Prediction Rate",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": f"rate(model_predictions_total{{model_name=\"{self.model_name}\"}}[5m])",
                                "legendFormat": "Predictions/sec"
                            }
                        ]
                    },
                    {
                        "title": "Prediction Latency",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": f"histogram_quantile(0.95, rate(model_prediction_duration_seconds_bucket{{model_name=\"{self.model_name}\"}}[5m]))",
                                "legendFormat": "95th percentile"
                            }
                        ]
                    },
                    {
                        "title": "Model Accuracy",
                        "type": "singlestat",
                        "targets": [
                            {
                                "expr": f"model_accuracy{{model_name=\"{self.model_name}\"}}",
                                "legendFormat": "Accuracy"
                            }
                        ]
                    },
                    {
                        "title": "Data Drift Score",
                        "type": "singlestat",
                        "targets": [
                            {
                                "expr": f"data_drift_score{{model_name=\"{self.model_name}\"}}",
                                "legendFormat": "Drift Score"
                            }
                        ]
                    }
                ]
            }
        }

        return dashboard_config

    def setup_alerting(self, alert_rules: dict):
        """Alerting-Regeln für Model-Monitoring einrichten"""
        prometheus_rules = []

        # Accuracy-Alert
        if 'accuracy_threshold' in alert_rules:
            prometheus_rules.append({
                "alert": f"{self.model_name}_low_accuracy",
                "expr": f"model_accuracy{{model_name=\"{self.model_name}\"}} < {alert_rules['accuracy_threshold']}",
                "for": "5m",
                "labels": {
                    "severity": "warning",
                    "model": self.model_name
                },
                "annotations": {
                    "summary": f"Model {self.model_name} accuracy below threshold",
                    "description": f"Model accuracy is {{ $value }} which is below the threshold of {alert_rules['accuracy_threshold']}"
                }
            })

        # Latency-Alert
        if 'latency_threshold' in alert_rules:
            prometheus_rules.append({
                "alert": f"{self.model_name}_high_latency",
                "expr": f"histogram_quantile(0.95, rate(model_prediction_duration_seconds_bucket{{model_name=\"{self.model_name}\"}}[5m])) > {alert_rules['latency_threshold']}",
                "for": "2m",
                "labels": {
                    "severity": "warning",
                    "model": self.model_name
                },
                "annotations": {
                    "summary": f"Model {self.model_name} high latency",
                    "description": f"Model prediction latency is {{ $value }}s which is above the threshold of {alert_rules['latency_threshold']}s"
                }
            })

        # Data Drift Alert
        if 'drift_threshold' in alert_rules:
            prometheus_rules.append({
                "alert": f"{self.model_name}_data_drift",
                "expr": f"data_drift_score{{model_name=\"{self.model_name}\"}} < {alert_rules['drift_threshold']}",
                "for": "10m",
                "labels": {
                    "severity": "critical",
                    "model": self.model_name
                },
                "annotations": {
                    "summary": f"Data drift detected for {self.model_name}",
                    "description": f"Data drift score is {{ $value }} which is below the threshold of {alert_rules['drift_threshold']}"
                }
            })

        return prometheus_rules

Enterprise MLOps Best Practices

Multi-Tenant MLOps-Architektur

## enterprise-mlops-architecture.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: mlops-enterprise
  labels:
    name: mlops-enterprise
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: mlops-enterprise
  name: mlops-developer
rules:
  - apiGroups: ['']
    resources: ['pods', 'services', 'configmaps']
    verbs: ['get', 'list', 'create', 'update', 'delete']
  - apiGroups: ['apps']
    resources: ['deployments']
    verbs: ['get', 'list', 'create', 'update', 'delete']
  - apiGroups: ['kubeflow.org']
    resources: ['experiments', 'runs', 'recurringruns']
    verbs: ['get', 'list', 'create', 'update', 'delete']
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: mlops-enterprise
  name: mlops-admin
rules:
  - apiGroups: ['']
    resources: ['*']
    verbs: ['*']
  - apiGroups: ['kubeflow.org']
    resources: ['*']
    verbs: ['*']
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: mlops-developer-binding
  namespace: mlops-enterprise
subjects:
  - kind: ServiceAccount
    name: mlops-developer
    namespace: mlops-enterprise
roleRef:
  kind: Role
  name: mlops-developer
  apiGroup: rbac.authorization.k8s.io

Resource Management und Quotas

## mlops-resource-quotas.yaml
apiVersion: v1
kind: ResourceQuota
metadata:
  name: mlops-quota
  namespace: mlops-enterprise
spec:
  hard:
    requests.cpu: '16'
    requests.memory: 32Gi
    limits.cpu: '32'
    limits.memory: 64Gi
    persistentvolumeclaims: '10'
    services: '20'
    pods: '50'
---
apiVersion: v1
kind: LimitRange
metadata:
  name: mlops-limits
  namespace: mlops-enterprise
spec:
  limits:
    - default:
        cpu: 1000m
        memory: 2Gi
      defaultRequest:
        cpu: 500m
        memory: 1Gi
      type: Container
    - default:
        cpu: 2000m
        memory: 4Gi
      defaultRequest:
        cpu: 1000m
        memory: 2Gi
      type: Pod

Fazit: MLOps mit Kubernetes für Enterprise

MLOps mit Kubernetes bietet deutschen Unternehmen eine leistungsstarke Plattform für Machine Learning:

Technologische Vorteile:

  • Kubeflow für ML-Workflow-Management
  • MLflow für Experiment Tracking und Model Registry
  • Tekton für CI/CD-Pipelines
  • Prometheus/Grafana für Monitoring

Enterprise-Vorteile:

  • Multi-Tenant-Support für verschiedene Teams
  • Resource Management und Quotas
  • Governance und Compliance für kritische ML-Anwendungen
  • Skalierbare Infrastruktur für große ML-Workloads

Nächste Schritte:

  1. Kubeflow Installation in Kubernetes-Cluster
  2. MLflow Setup für Experiment Tracking
  3. CI/CD-Pipelines mit Tekton konfigurieren
  4. Monitoring und Alerting einrichten

MLOps mit Kubernetes macht Enterprise Machine Learning skalierbar, reproduzierbar und production-ready.


Weitere Artikel zum Thema: Kubernetes AI Machine Learning, Enterprise Automation, MLOps

📖 Verwandte Artikel

Weitere interessante Beiträge zu ähnlichen Themen