32.9. Legacy Enterprise Integration: Brownfield MLOps
Note
The Real World: It is easy to build MLOps in a startup with a clean stack. It is hard to build MLOps when the “System of Record” is a Mainframe from 1982 running COBOL.
For most Fortune 500 companies, the challenge is not “How do I use PyTorch?” but “How do I feed PyTorch with data locked in an AS/400?”
32.9.1. The “Two-Speed IT” Problem
Enterprises run at two speeds:
- Fast IT: Cloud, AI, Mobile Apps. Iterates weekly.
- Slow IT: Mainframes, ERPs, Core Banking. Iterates yearly.
graph TB
subgraph "Fast IT (Weeks)"
A[ML Platform] --> B[Feature Store]
B --> C[Model Training]
end
subgraph "Slow IT (Years)"
E[Mainframe COBOL] --> F[Oracle ERP]
F --> G[SAP Financials]
end
C -.->|"Challenge"| E
The Golden Rule: Do not couple Fast IT directly to Slow IT.
| Anti-Pattern | Symptom | Impact |
|---|---|---|
| Direct DB Query | SELECT * FROM PROD | Table locks, outages |
| Synchronous Coupling | ML waits for mainframe | 60s latency |
| Schema Dependency | References 500-column table | Brittle |
32.9.2. Integration Pattern 1: CDC (Change Data Capture)
Do NOT query production databases directly. Use CDC.
graph LR
A[Mainframe DB2] -->|CDC| B(Debezium)
B --> C[Kafka]
C --> D[S3 Data Lake]
D --> E[Training Pipeline]
CDC Tool Comparison
| Tool | Best For | Latency |
|---|---|---|
| Debezium | Open source, PostgreSQL | Seconds |
| AWS DMS | AWS native, Oracle | Minutes |
| GCP Datastream | GCP native | Seconds |
| Qlik Replicate | Enterprise | Seconds |
Debezium Configuration
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: legacy-postgres-connector
spec:
class: io.debezium.connector.postgresql.PostgresConnector
config:
database.hostname: legacy-db.internal
database.port: "5432"
database.dbname: production_crm
database.server.name: legacy_crm
plugin.name: pgoutput
slot.name: debezium_ml_slot
table.include.list: public.customers,public.orders
snapshot.mode: initial
snapshot.locking.mode: none
Terraform: AWS DMS
resource "aws_dms_replication_instance" "legacy_cdc" {
replication_instance_id = "legacy-oracle-cdc"
replication_instance_class = "dms.r5.large"
allocated_storage = 100
multi_az = true
publicly_accessible = false
}
resource "aws_dms_endpoint" "oracle_source" {
endpoint_id = "legacy-oracle-source"
endpoint_type = "source"
engine_name = "oracle"
server_name = var.oracle_host
port = 1521
database_name = "PRODDB"
username = var.oracle_username
password = var.oracle_password
}
resource "aws_dms_endpoint" "s3_target" {
endpoint_id = "data-lake-s3-target"
endpoint_type = "target"
engine_name = "s3"
s3_settings {
bucket_name = aws_s3_bucket.data_lake.id
bucket_folder = "cdc/oracle"
compression_type = "GZIP"
data_format = "parquet"
date_partition_enabled = true
}
service_access_role_arn = aws_iam_role.dms_s3.arn
}
resource "aws_dms_replication_task" "oracle_cdc" {
replication_task_id = "oracle-to-s3-cdc"
replication_instance_arn = aws_dms_replication_instance.legacy_cdc.arn
source_endpoint_arn = aws_dms_endpoint.oracle_source.arn
target_endpoint_arn = aws_dms_endpoint.s3_target.arn
migration_type = "full-load-and-cdc"
table_mappings = jsonencode({
rules = [{
rule-type = "selection"
rule-id = "1"
object-locator = {
schema-name = "ANALYTICS"
table-name = "%"
}
rule-action = "include"
}]
})
}
32.9.3. Integration Pattern 2: Reverse ETL
Predictions are useless in S3. They need to be in Salesforce or SAP.
graph LR
A[Model Prediction] --> B[Feature Store]
B --> C[Reverse ETL]
C --> D[Salesforce]
C --> E[SAP]
C --> F[Legacy CRM]
Reverse ETL Implementation
from simple_salesforce import Salesforce
from dataclasses import dataclass
from typing import List, Dict
import backoff
@dataclass
class PredictionRecord:
customer_id: str
prediction_score: float
prediction_label: str
model_version: str
class SalesforceSync:
def __init__(self, username: str, password: str, token: str):
self.sf = Salesforce(
username=username,
password=password,
security_token=token
)
@backoff.on_exception(backoff.expo, Exception, max_tries=3)
def sync_batch(self, records: List[PredictionRecord]) -> Dict:
sf_records = [{
"External_Customer_ID__c": rec.customer_id,
"Churn_Score__c": rec.prediction_score,
"Risk_Level__c": rec.prediction_label,
"Model_Version__c": rec.model_version,
} for rec in records]
results = self.sf.bulk.Contact.upsert(
sf_records,
"External_Customer_ID__c",
batch_size=200
)
success = sum(1 for r in results if r.get("success"))
return {"success": success, "failed": len(results) - success}
class LegacyDBSync:
def __init__(self, connection_string: str):
import pyodbc
self.conn_str = connection_string
def sync_batch(self, records: List[PredictionRecord]) -> Dict:
import pyodbc
conn = pyodbc.connect(self.conn_str)
cursor = conn.cursor()
for rec in records:
cursor.execute("""
MERGE INTO ML_PREDICTIONS AS target
USING (SELECT ? AS CUSTOMER_ID) AS source
ON target.CUSTOMER_ID = source.CUSTOMER_ID
WHEN MATCHED THEN
UPDATE SET CHURN_SCORE = ?, RISK_LEVEL = ?
WHEN NOT MATCHED THEN
INSERT (CUSTOMER_ID, CHURN_SCORE, RISK_LEVEL)
VALUES (?, ?, ?);
""", (rec.customer_id, rec.prediction_score, rec.prediction_label,
rec.customer_id, rec.prediction_score, rec.prediction_label))
conn.commit()
return {"synced": len(records)}
32.9.4. Integration Pattern 3: Strangler Fig
Replace legacy rules engines incrementally, not all at once.
graph TB
subgraph "Phase 1: Shadow"
A[Request] --> B[Gateway]
B --> C[Legacy Rules]
B --> D[ML Model]
C --> E[Response]
D --> F[Compare Log]
end
subgraph "Phase 2: Split"
G[Request] --> H[Gateway]
H -->|90%| I[Legacy]
H -->|10%| J[ML]
end
subgraph "Phase 3: Cutover"
K[Request] --> L[Gateway]
L --> M[ML Model]
end
Traffic Splitting Implementation
from fastapi import FastAPI
from pydantic import BaseModel
import random
app = FastAPI()
class TrafficConfig:
ml_traffic_pct = 0.0
shadow_mode = True
config = TrafficConfig()
class DecisionRequest(BaseModel):
customer_id: str
amount: float
@app.post("/decide")
async def make_decision(req: DecisionRequest):
if config.shadow_mode:
legacy = await call_legacy(req)
ml = await call_ml(req)
log_comparison(legacy, ml)
return legacy
if random.random() < config.ml_traffic_pct:
try:
return await call_ml(req)
except:
return await call_legacy(req) # Fallback
return await call_legacy(req)
@app.post("/admin/traffic")
async def set_traffic(ml_pct: float, shadow: bool = False):
config.ml_traffic_pct = ml_pct
config.shadow_mode = shadow
return {"ml_pct": ml_pct, "shadow": shadow}
32.9.5. Handling EBCDIC and COBOL Data
Mainframes use EBCDIC encoding and COMP-3 (packed decimal) numbers.
import codecs
def decode_ebcdic(data: bytes) -> str:
return codecs.decode(data, 'cp500').strip()
def decode_comp3(data: bytes, decimals: int = 0) -> float:
"""Decode packed decimal (COMP-3)."""
digits = []
sign = 1
for i, byte in enumerate(data):
high = (byte >> 4) & 0x0F
low = byte & 0x0F
if i == len(data) - 1:
digits.append(high)
if low in (0x0D, 0x0B):
sign = -1
else:
digits.append(high)
digits.append(low)
num = 0
for d in digits:
num = num * 10 + d
if decimals > 0:
num = num / (10 ** decimals)
return num * sign
def parse_mainframe_record(binary_record: bytes) -> dict:
# Field 1: Name (EBCDIC, 20 bytes)
# Field 2: Salary (COMP-3, 4 bytes, 2 decimals)
name = decode_ebcdic(binary_record[0:20])
salary = decode_comp3(binary_record[20:24], decimals=2)
return {"name": name, "salary": salary}
Spark with Cobrix
val df = spark.read
.format("cobol")
.option("copybook", "s3://metadata/BANK_ACCT.cpy")
.load("s3://raw-data/BANK_ACCT.dat")
32.9.6. The Sidecar Pattern for Protocol Translation
# envoy_sidecar.yaml
static_resources:
listeners:
- name: json_to_soap
address:
socket_address:
address: 127.0.0.1
port_value: 8081
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
route_config:
virtual_hosts:
- name: backend
domains: ["*"]
routes:
- match:
prefix: "/api/legacy/"
route:
cluster: legacy_soap_cluster
http_filters:
- name: envoy.filters.http.lua
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.lua.v3.Lua
inline_code: |
function envoy_on_request(request_handle)
-- Convert JSON to SOAP
local body = request_handle:body():getBytes(0, -1)
local soap = build_soap_envelope(body)
request_handle:body():setBytes(soap)
request_handle:headers():replace("content-type", "text/xml")
end
32.9.7. Anti-Corruption Layer (ACL)
Prevent legacy concepts from polluting the ML system.
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
@dataclass
class CleanCustomer:
customer_id: str
email: str
tenure_days: int
monthly_spend: float
risk_segment: str
class CustomerACL:
def __init__(self, legacy_engine, clean_engine):
self.legacy = legacy_engine
self.clean = clean_engine
def run_daily_sync(self):
legacy_df = self._extract()
clean_df = self._transform(legacy_df)
self._load(clean_df)
def _extract(self) -> pd.DataFrame:
return pd.read_sql("""
SELECT CUST_ID, EMAIL_ADDR, ACCT_OPEN_DT,
MTH_SPEND_AMT, RISK_CD
FROM LEGACY_CUSTOMER_MASTER
WHERE STATUS_CD = 'A'
""", self.legacy)
def _transform(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.rename(columns={
'CUST_ID': 'customer_id',
'EMAIL_ADDR': 'email',
'MTH_SPEND_AMT': 'monthly_spend',
'RISK_CD': 'legacy_risk'
})
df['tenure_days'] = (datetime.now() -
pd.to_datetime(df['ACCT_OPEN_DT'])).dt.days
df['risk_segment'] = df['legacy_risk'].map({
'H': 'high', 'M': 'medium', 'L': 'low'
}).fillna('unknown')
df['email'] = df['email'].str.lower().str.strip()
return df[['customer_id', 'email', 'tenure_days',
'monthly_spend', 'risk_segment']]
def _load(self, df: pd.DataFrame):
df.to_sql('customer_features', self.clean,
if_exists='replace', index=False)
32.9.8. Summary Checklist
| Principle | Implementation | Tools |
|---|---|---|
| Don’t Touch | Never write to legacy DB | CDC, Read Replicas |
| Don’t Couple | Use queues to buffer | Kafka, EventBridge |
| Translate Early | Convert to Parquet at edge | Cobrix, Parsers |
| Strangle | Gradual traffic migration | API Gateway |
| Protect | Anti-Corruption Layer | ETL Jobs |
graph TB
A[Legacy] -->|CDC| B[Kafka]
B --> C[Data Lake]
C --> D[ACL]
D --> E[Feature Store]
E --> F[Training]
F --> G[Serving]
G -->|Reverse ETL| H[Salesforce]
[End of Section 32.9]