Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

32.9. Legacy Enterprise Integration: Brownfield MLOps

Note

The Real World: It is easy to build MLOps in a startup with a clean stack. It is hard to build MLOps when the “System of Record” is a Mainframe from 1982 running COBOL.

For most Fortune 500 companies, the challenge is not “How do I use PyTorch?” but “How do I feed PyTorch with data locked in an AS/400?”


32.9.1. The “Two-Speed IT” Problem

Enterprises run at two speeds:

  1. Fast IT: Cloud, AI, Mobile Apps. Iterates weekly.
  2. Slow IT: Mainframes, ERPs, Core Banking. Iterates yearly.
graph TB
    subgraph "Fast IT (Weeks)"
        A[ML Platform] --> B[Feature Store]
        B --> C[Model Training]
    end
    
    subgraph "Slow IT (Years)"
        E[Mainframe COBOL] --> F[Oracle ERP]
        F --> G[SAP Financials]
    end
    
    C -.->|"Challenge"| E

The Golden Rule: Do not couple Fast IT directly to Slow IT.

Anti-PatternSymptomImpact
Direct DB QuerySELECT * FROM PRODTable locks, outages
Synchronous CouplingML waits for mainframe60s latency
Schema DependencyReferences 500-column tableBrittle

32.9.2. Integration Pattern 1: CDC (Change Data Capture)

Do NOT query production databases directly. Use CDC.

graph LR
    A[Mainframe DB2] -->|CDC| B(Debezium)
    B --> C[Kafka]
    C --> D[S3 Data Lake]
    D --> E[Training Pipeline]

CDC Tool Comparison

ToolBest ForLatency
DebeziumOpen source, PostgreSQLSeconds
AWS DMSAWS native, OracleMinutes
GCP DatastreamGCP nativeSeconds
Qlik ReplicateEnterpriseSeconds

Debezium Configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: legacy-postgres-connector
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  config:
    database.hostname: legacy-db.internal
    database.port: "5432"
    database.dbname: production_crm
    database.server.name: legacy_crm
    plugin.name: pgoutput
    slot.name: debezium_ml_slot
    table.include.list: public.customers,public.orders
    snapshot.mode: initial
    snapshot.locking.mode: none

Terraform: AWS DMS

resource "aws_dms_replication_instance" "legacy_cdc" {
  replication_instance_id    = "legacy-oracle-cdc"
  replication_instance_class = "dms.r5.large"
  allocated_storage          = 100
  multi_az                   = true
  publicly_accessible        = false
}

resource "aws_dms_endpoint" "oracle_source" {
  endpoint_id   = "legacy-oracle-source"
  endpoint_type = "source"
  engine_name   = "oracle"
  server_name   = var.oracle_host
  port          = 1521
  database_name = "PRODDB"
  username      = var.oracle_username
  password      = var.oracle_password
}

resource "aws_dms_endpoint" "s3_target" {
  endpoint_id   = "data-lake-s3-target"
  endpoint_type = "target"
  engine_name   = "s3"
  
  s3_settings {
    bucket_name            = aws_s3_bucket.data_lake.id
    bucket_folder          = "cdc/oracle"
    compression_type       = "GZIP"
    data_format            = "parquet"
    date_partition_enabled = true
  }
  
  service_access_role_arn = aws_iam_role.dms_s3.arn
}

resource "aws_dms_replication_task" "oracle_cdc" {
  replication_task_id      = "oracle-to-s3-cdc"
  replication_instance_arn = aws_dms_replication_instance.legacy_cdc.arn
  source_endpoint_arn      = aws_dms_endpoint.oracle_source.arn
  target_endpoint_arn      = aws_dms_endpoint.s3_target.arn
  migration_type           = "full-load-and-cdc"
  
  table_mappings = jsonencode({
    rules = [{
      rule-type = "selection"
      rule-id   = "1"
      object-locator = {
        schema-name = "ANALYTICS"
        table-name  = "%"
      }
      rule-action = "include"
    }]
  })
}

32.9.3. Integration Pattern 2: Reverse ETL

Predictions are useless in S3. They need to be in Salesforce or SAP.

graph LR
    A[Model Prediction] --> B[Feature Store]
    B --> C[Reverse ETL]
    C --> D[Salesforce]
    C --> E[SAP]
    C --> F[Legacy CRM]

Reverse ETL Implementation

from simple_salesforce import Salesforce
from dataclasses import dataclass
from typing import List, Dict
import backoff

@dataclass
class PredictionRecord:
    customer_id: str
    prediction_score: float
    prediction_label: str
    model_version: str

class SalesforceSync:
    def __init__(self, username: str, password: str, token: str):
        self.sf = Salesforce(
            username=username,
            password=password,
            security_token=token
        )
    
    @backoff.on_exception(backoff.expo, Exception, max_tries=3)
    def sync_batch(self, records: List[PredictionRecord]) -> Dict:
        sf_records = [{
            "External_Customer_ID__c": rec.customer_id,
            "Churn_Score__c": rec.prediction_score,
            "Risk_Level__c": rec.prediction_label,
            "Model_Version__c": rec.model_version,
        } for rec in records]
        
        results = self.sf.bulk.Contact.upsert(
            sf_records,
            "External_Customer_ID__c",
            batch_size=200
        )
        
        success = sum(1 for r in results if r.get("success"))
        return {"success": success, "failed": len(results) - success}

class LegacyDBSync:
    def __init__(self, connection_string: str):
        import pyodbc
        self.conn_str = connection_string
    
    def sync_batch(self, records: List[PredictionRecord]) -> Dict:
        import pyodbc
        conn = pyodbc.connect(self.conn_str)
        cursor = conn.cursor()
        
        for rec in records:
            cursor.execute("""
                MERGE INTO ML_PREDICTIONS AS target
                USING (SELECT ? AS CUSTOMER_ID) AS source
                ON target.CUSTOMER_ID = source.CUSTOMER_ID
                WHEN MATCHED THEN
                    UPDATE SET CHURN_SCORE = ?, RISK_LEVEL = ?
                WHEN NOT MATCHED THEN
                    INSERT (CUSTOMER_ID, CHURN_SCORE, RISK_LEVEL)
                    VALUES (?, ?, ?);
            """, (rec.customer_id, rec.prediction_score, rec.prediction_label,
                  rec.customer_id, rec.prediction_score, rec.prediction_label))
        
        conn.commit()
        return {"synced": len(records)}

32.9.4. Integration Pattern 3: Strangler Fig

Replace legacy rules engines incrementally, not all at once.

graph TB
    subgraph "Phase 1: Shadow"
        A[Request] --> B[Gateway]
        B --> C[Legacy Rules]
        B --> D[ML Model]
        C --> E[Response]
        D --> F[Compare Log]
    end
    
    subgraph "Phase 2: Split"
        G[Request] --> H[Gateway]
        H -->|90%| I[Legacy]
        H -->|10%| J[ML]
    end
    
    subgraph "Phase 3: Cutover"
        K[Request] --> L[Gateway]
        L --> M[ML Model]
    end

Traffic Splitting Implementation

from fastapi import FastAPI
from pydantic import BaseModel
import random

app = FastAPI()

class TrafficConfig:
    ml_traffic_pct = 0.0
    shadow_mode = True

config = TrafficConfig()

class DecisionRequest(BaseModel):
    customer_id: str
    amount: float

@app.post("/decide")
async def make_decision(req: DecisionRequest):
    if config.shadow_mode:
        legacy = await call_legacy(req)
        ml = await call_ml(req)
        log_comparison(legacy, ml)
        return legacy
    
    if random.random() < config.ml_traffic_pct:
        try:
            return await call_ml(req)
        except:
            return await call_legacy(req)  # Fallback
    
    return await call_legacy(req)

@app.post("/admin/traffic")
async def set_traffic(ml_pct: float, shadow: bool = False):
    config.ml_traffic_pct = ml_pct
    config.shadow_mode = shadow
    return {"ml_pct": ml_pct, "shadow": shadow}

32.9.5. Handling EBCDIC and COBOL Data

Mainframes use EBCDIC encoding and COMP-3 (packed decimal) numbers.

import codecs

def decode_ebcdic(data: bytes) -> str:
    return codecs.decode(data, 'cp500').strip()

def decode_comp3(data: bytes, decimals: int = 0) -> float:
    """Decode packed decimal (COMP-3)."""
    digits = []
    sign = 1
    
    for i, byte in enumerate(data):
        high = (byte >> 4) & 0x0F
        low = byte & 0x0F
        
        if i == len(data) - 1:
            digits.append(high)
            if low in (0x0D, 0x0B):
                sign = -1
        else:
            digits.append(high)
            digits.append(low)
    
    num = 0
    for d in digits:
        num = num * 10 + d
    
    if decimals > 0:
        num = num / (10 ** decimals)
    
    return num * sign

def parse_mainframe_record(binary_record: bytes) -> dict:
    # Field 1: Name (EBCDIC, 20 bytes)
    # Field 2: Salary (COMP-3, 4 bytes, 2 decimals)
    name = decode_ebcdic(binary_record[0:20])
    salary = decode_comp3(binary_record[20:24], decimals=2)
    return {"name": name, "salary": salary}

Spark with Cobrix

val df = spark.read
  .format("cobol")
  .option("copybook", "s3://metadata/BANK_ACCT.cpy")
  .load("s3://raw-data/BANK_ACCT.dat")

32.9.6. The Sidecar Pattern for Protocol Translation

# envoy_sidecar.yaml
static_resources:
  listeners:
  - name: json_to_soap
    address:
      socket_address:
        address: 127.0.0.1
        port_value: 8081
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          route_config:
            virtual_hosts:
            - name: backend
              domains: ["*"]
              routes:
              - match:
                  prefix: "/api/legacy/"
                route:
                  cluster: legacy_soap_cluster
          http_filters:
          - name: envoy.filters.http.lua
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.lua.v3.Lua
              inline_code: |
                function envoy_on_request(request_handle)
                  -- Convert JSON to SOAP
                  local body = request_handle:body():getBytes(0, -1)
                  local soap = build_soap_envelope(body)
                  request_handle:body():setBytes(soap)
                  request_handle:headers():replace("content-type", "text/xml")
                end

32.9.7. Anti-Corruption Layer (ACL)

Prevent legacy concepts from polluting the ML system.

from dataclasses import dataclass
from datetime import datetime
import pandas as pd

@dataclass
class CleanCustomer:
    customer_id: str
    email: str
    tenure_days: int
    monthly_spend: float
    risk_segment: str

class CustomerACL:
    def __init__(self, legacy_engine, clean_engine):
        self.legacy = legacy_engine
        self.clean = clean_engine
    
    def run_daily_sync(self):
        legacy_df = self._extract()
        clean_df = self._transform(legacy_df)
        self._load(clean_df)
    
    def _extract(self) -> pd.DataFrame:
        return pd.read_sql("""
            SELECT CUST_ID, EMAIL_ADDR, ACCT_OPEN_DT, 
                   MTH_SPEND_AMT, RISK_CD
            FROM LEGACY_CUSTOMER_MASTER
            WHERE STATUS_CD = 'A'
        """, self.legacy)
    
    def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.rename(columns={
            'CUST_ID': 'customer_id',
            'EMAIL_ADDR': 'email',
            'MTH_SPEND_AMT': 'monthly_spend',
            'RISK_CD': 'legacy_risk'
        })
        
        df['tenure_days'] = (datetime.now() - 
            pd.to_datetime(df['ACCT_OPEN_DT'])).dt.days
        
        df['risk_segment'] = df['legacy_risk'].map({
            'H': 'high', 'M': 'medium', 'L': 'low'
        }).fillna('unknown')
        
        df['email'] = df['email'].str.lower().str.strip()
        
        return df[['customer_id', 'email', 'tenure_days', 
                   'monthly_spend', 'risk_segment']]
    
    def _load(self, df: pd.DataFrame):
        df.to_sql('customer_features', self.clean, 
                  if_exists='replace', index=False)

32.9.8. Summary Checklist

PrincipleImplementationTools
Don’t TouchNever write to legacy DBCDC, Read Replicas
Don’t CoupleUse queues to bufferKafka, EventBridge
Translate EarlyConvert to Parquet at edgeCobrix, Parsers
StrangleGradual traffic migrationAPI Gateway
ProtectAnti-Corruption LayerETL Jobs
graph TB
    A[Legacy] -->|CDC| B[Kafka]
    B --> C[Data Lake]
    C --> D[ACL]
    D --> E[Feature Store]
    E --> F[Training]
    F --> G[Serving]
    G -->|Reverse ETL| H[Salesforce]

[End of Section 32.9]