Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

34.1. Video Stream Processing: RTSP, Kinesis & GStreamer

Note

The High-Bandwidth Challenge: A single 1080p 30fps stream is ~5 Mbps. A thousand cameras is 5 Gbps. Your “CSV” MLOps stack will melt.


34.1.1. The Video Pipeline Anatomy

graph LR
    A[IP Camera] -->|RTSP| B[GStreamer Ingest]
    B -->|MKV| C[Kinesis Video]
    C --> D[Decoder]
    D -->|RGB| E[ML Inference]
    E --> F[Analytics]
StageFunctionBottleneckTypical Latency
IngestRTSP captureNetwork stability50-200ms
TransportCloud bufferingBandwidth cost100-500ms
DecodeH264 → RGBCPU/GPU cycles10-50ms
InferenceObject detectionGPU memory20-100ms
Post-processTracking, alertsCPU5-20ms

Video ML vs Traditional ML

DimensionTraditional MLVideo ML
Data rateGB/dayTB/hour
Latency toleranceSeconds-minutesMilliseconds
ProcessingBatchStreaming
InfrastructureCPU clustersGPU + specialized decoders
Cost driverComputeBandwidth + storage

34.1.2. GStreamer for RTSP Ingestion

GStreamer is the gold standard for video capture. OpenCV’s VideoCapture falls apart under real-world conditions.

Basic RTSP Capture

import cv2
import numpy as np
from typing import Optional, Tuple
import threading
import queue
import time

class RTSPCapture:
    """Robust RTSP capture using GStreamer backend."""
    
    def __init__(
        self, 
        rtsp_url: str, 
        use_gpu: bool = False,
        buffer_size: int = 1,
        reconnect_attempts: int = 5
    ):
        self.rtsp_url = rtsp_url
        self.use_gpu = use_gpu
        self.buffer_size = buffer_size
        self.reconnect_attempts = reconnect_attempts
        
        self.pipeline = self._build_pipeline()
        self.cap = None
        self._connect()
        
        # Threading for non-blocking reads
        self.frame_queue = queue.Queue(maxsize=buffer_size)
        self.running = False
        self._thread = None
    
    def _build_pipeline(self) -> str:
        """Build GStreamer pipeline string."""
        decoder = "nvdec" if self.use_gpu else "avdec_h264"
        
        pipeline = (
            f"rtspsrc location={self.rtsp_url} latency=0 "
            f"protocols=tcp drop-on-latency=true ! "
            f"rtph264depay ! h264parse ! {decoder} ! "
            f"videoconvert ! video/x-raw,format=BGR ! "
            f"appsink max-buffers=1 drop=true sync=false"
        )
        return pipeline
    
    def _connect(self) -> bool:
        """Attempt to connect to RTSP stream."""
        for attempt in range(self.reconnect_attempts):
            self.cap = cv2.VideoCapture(self.pipeline, cv2.CAP_GSTREAMER)
            
            if self.cap.isOpened():
                print(f"Connected to {self.rtsp_url}")
                return True
            
            print(f"Connection attempt {attempt + 1} failed, retrying...")
            time.sleep(2 ** attempt)  # Exponential backoff
        
        raise ConnectionError(f"Failed to connect to {self.rtsp_url}")
    
    def start(self) -> None:
        """Start background capture thread."""
        self.running = True
        self._thread = threading.Thread(target=self._capture_loop, daemon=True)
        self._thread.start()
    
    def _capture_loop(self) -> None:
        """Background thread for continuous capture."""
        consecutive_failures = 0
        max_failures = 10
        
        while self.running:
            ret, frame = self.cap.read()
            
            if not ret:
                consecutive_failures += 1
                if consecutive_failures >= max_failures:
                    print("Connection lost, attempting reconnect...")
                    try:
                        self._connect()
                        consecutive_failures = 0
                    except ConnectionError:
                        print("Reconnection failed")
                        break
                continue
            
            consecutive_failures = 0
            
            # Drop old frames if queue is full
            if self.frame_queue.full():
                try:
                    self.frame_queue.get_nowait()
                except queue.Empty:
                    pass
            
            self.frame_queue.put((time.time(), frame))
    
    def read(self, timeout: float = 1.0) -> Tuple[bool, Optional[np.ndarray], float]:
        """Read frame with timeout.
        
        Returns:
            (success, frame, timestamp)
        """
        try:
            timestamp, frame = self.frame_queue.get(timeout=timeout)
            return True, frame, timestamp
        except queue.Empty:
            return False, None, 0.0
    
    def stop(self) -> None:
        """Stop capture and release resources."""
        self.running = False
        if self._thread:
            self._thread.join(timeout=2.0)
        if self.cap:
            self.cap.release()


# Usage
cap = RTSPCapture(
    "rtsp://192.168.1.50:554/stream1",
    use_gpu=True,
    buffer_size=1
)
cap.start()

while True:
    success, frame, ts = cap.read()
    if not success:
        continue
    
    # Run inference
    results = model(frame)
    
    # Calculate end-to-end latency
    e2e_latency = time.time() - ts
    print(f"E2E latency: {e2e_latency*1000:.1f}ms")

Multi-Camera Manager

from dataclasses import dataclass
from typing import Dict, List, Callable
import concurrent.futures
import threading

@dataclass
class CameraConfig:
    camera_id: str
    rtsp_url: str
    zone: str
    use_gpu: bool = True
    priority: int = 1  # 1=high, 2=medium, 3=low

class MultiCameraManager:
    """Manage multiple RTSP streams with resource allocation."""
    
    def __init__(
        self, 
        configs: List[CameraConfig],
        max_concurrent: int = 10
    ):
        self.configs = {c.camera_id: c for c in configs}
        self.captures: Dict[str, RTSPCapture] = {}
        self.max_concurrent = max_concurrent
        self.executor = concurrent.futures.ThreadPoolExecutor(max_concurrent)
        self._lock = threading.Lock()
    
    def start_all(self) -> None:
        """Start all camera captures."""
        # Sort by priority
        sorted_configs = sorted(
            self.configs.values(), 
            key=lambda c: c.priority
        )
        
        for config in sorted_configs:
            self._start_camera(config)
    
    def _start_camera(self, config: CameraConfig) -> None:
        """Start individual camera capture."""
        try:
            cap = RTSPCapture(
                config.rtsp_url,
                use_gpu=config.use_gpu
            )
            cap.start()
            
            with self._lock:
                self.captures[config.camera_id] = cap
            
            print(f"Started camera: {config.camera_id}")
        except ConnectionError as e:
            print(f"Failed to start camera {config.camera_id}: {e}")
    
    def process_all(
        self, 
        inference_fn: Callable[[np.ndarray], dict],
        callback: Callable[[str, dict], None]
    ) -> None:
        """Process all camera feeds with inference function."""
        
        def process_camera(camera_id: str) -> None:
            cap = self.captures.get(camera_id)
            if not cap:
                return
            
            success, frame, ts = cap.read(timeout=0.1)
            if not success:
                return
            
            results = inference_fn(frame)
            results["camera_id"] = camera_id
            results["timestamp"] = ts
            
            callback(camera_id, results)
        
        # Submit all cameras for processing
        futures = [
            self.executor.submit(process_camera, cam_id)
            for cam_id in self.captures.keys()
        ]
        
        # Wait for completion
        concurrent.futures.wait(futures, timeout=1.0)
    
    def get_stats(self) -> dict:
        """Get statistics for all cameras."""
        return {
            "total_cameras": len(self.configs),
            "active_cameras": len(self.captures),
            "camera_status": {
                cam_id: "active" if cam_id in self.captures else "disconnected"
                for cam_id in self.configs.keys()
            }
        }
    
    def stop_all(self) -> None:
        """Stop all cameras."""
        for cap in self.captures.values():
            cap.stop()
        self.captures.clear()
        self.executor.shutdown(wait=True)

34.1.3. AWS Kinesis Video Streams

For cloud-scale video ingestion, Kinesis Video Streams provides durability and integration.

Terraform Infrastructure

# kinesis_video.tf

variable "cameras" {
  type = map(object({
    device_name = string
    zone        = string
    retention_hours = number
  }))
}

resource "aws_kinesis_video_stream" "camera" {
  for_each = var.cameras
  
  name                    = "camera-${each.key}"
  data_retention_in_hours = each.value.retention_hours
  device_name             = each.value.device_name
  media_type              = "video/h264"
  
  tags = {
    Environment = var.environment
    Zone        = each.value.zone
    ManagedBy   = "terraform"
  }
}

resource "aws_iam_role" "kvs_producer" {
  name = "kvs-producer-${var.environment}"
  
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = { Service = "kinesisvideo.amazonaws.com" }
    }]
  })
}

resource "aws_iam_role_policy" "kvs_producer_policy" {
  name = "kvs-producer-policy"
  role = aws_iam_role.kvs_producer.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "kinesisvideo:PutMedia",
          "kinesisvideo:GetDataEndpoint",
          "kinesisvideo:DescribeStream"
        ]
        Resource = [for s in aws_kinesis_video_stream.camera : s.arn]
      }
    ]
  })
}

# Lambda for ML processing
resource "aws_lambda_function" "frame_processor" {
  function_name = "kvs-frame-processor-${var.environment}"
  role          = aws_iam_role.lambda_processor.arn
  handler       = "handler.process_frame"
  runtime       = "python3.11"
  memory_size   = 1024
  timeout       = 30
  
  # Use container image for ML dependencies
  package_type  = "Image"
  image_uri     = "${aws_ecr_repository.ml_processor.repository_url}:latest"
  
  environment {
    variables = {
      MODEL_PATH = "s3://${var.model_bucket}/yolov8n.onnx"
    }
  }
  
  vpc_config {
    subnet_ids         = var.subnet_ids
    security_group_ids = [aws_security_group.lambda.id]
  }
}

# Connect KVS to Lambda
resource "aws_lambda_event_source_mapping" "kvs_trigger" {
  for_each = aws_kinesis_video_stream.camera
  
  event_source_arn  = each.value.arn
  function_name     = aws_lambda_function.frame_processor.arn
  starting_position = "LATEST"
  
  batch_size = 1
}

Consuming from Kinesis Video

import boto3
from amazon_kinesis_video_consumer_library.kinesis_video_streams_parser import (
    KinesisVideoStreamsParser,
)
import numpy as np
import av
from io import BytesIO

class KVSConsumer:
    """Consume frames from Kinesis Video Streams."""
    
    def __init__(self, stream_name: str, region: str = "us-east-1"):
        self.stream_name = stream_name
        self.region = region
        self.kvs_client = boto3.client("kinesisvideo", region_name=region)
        
    def get_media_endpoint(self) -> str:
        """Get the media endpoint for the stream."""
        response = self.kvs_client.get_data_endpoint(
            StreamName=self.stream_name,
            APIName="GET_MEDIA"
        )
        return response["DataEndpoint"]
    
    def get_frames(self, start_selector: dict = None):
        """Generator that yields frames from the stream."""
        endpoint = self.get_media_endpoint()
        kvs_media = boto3.client(
            "kinesis-video-media",
            endpoint_url=endpoint,
            region_name=self.region
        )
        
        if start_selector is None:
            start_selector = {"StartSelectorType": "NOW"}
        
        response = kvs_media.get_media(
            StreamName=self.stream_name,
            StartSelector=start_selector
        )
        
        # Parse MKV stream
        parser = KinesisVideoStreamsParser()
        
        for chunk in response["Payload"].iter_chunks():
            for fragment in parser.parse(chunk):
                for frame in self._decode_fragment(fragment):
                    yield frame
    
    def _decode_fragment(self, fragment: bytes) -> list:
        """Decode MKV fragment to RGB frames."""
        frames = []
        
        container = av.open(BytesIO(fragment))
        for frame in container.decode(video=0):
            img = frame.to_ndarray(format="bgr24")
            frames.append({
                "image": img,
                "pts": frame.pts,
                "timestamp": frame.time
            })
        
        return frames


# Usage with inference
def process_stream(stream_name: str, model):
    """Process KVS stream with ML model."""
    consumer = KVSConsumer(stream_name)
    
    for frame_data in consumer.get_frames():
        image = frame_data["image"]
        timestamp = frame_data["timestamp"]
        
        # Run inference
        results = model(image)
        
        # Process detections
        for detection in results.boxes:
            print(f"[{timestamp}] Detected: {detection.cls} at {detection.xyxy}")

34.1.4. Frame Sampling Strategy

Processing every frame is wasteful. Smart sampling reduces compute by 10-100x.

StrategyWhen to UseCompute SavingsAccuracy Impact
Every N framesUniform samplingLow (if N≤10)
I-Frames onlyLow-motion scenes30×Medium
Motion-triggeredSecurity cameras50-100×Very low
Scene changeContent analysisVariableLow
Adaptive rateMixed content10-50×Very low

I-Frame Extraction

import subprocess
import os
from pathlib import Path
from typing import List, Optional
import tempfile

def extract_iframes(
    video_path: str, 
    output_dir: Optional[str] = None,
    quality: int = 2  # 1-31, lower is better
) -> List[str]:
    """Extract I-frames only for efficient processing.
    
    Args:
        video_path: Path to input video
        output_dir: Directory for output frames (temp if None)
        quality: JPEG quality (1=best, 31=worst)
    
    Returns:
        List of frame file paths
    """
    if output_dir is None:
        output_dir = tempfile.mkdtemp(prefix="iframes_")
    
    os.makedirs(output_dir, exist_ok=True)
    
    cmd = [
        "ffmpeg", "-i", video_path,
        "-vf", "select='eq(pict_type,PICT_TYPE_I)'",
        "-vsync", "vfr",
        "-q:v", str(quality),
        f"{output_dir}/frame_%06d.jpg"
    ]
    
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        raise RuntimeError(f"FFmpeg failed: {result.stderr}")
    
    frames = sorted([
        os.path.join(output_dir, f) 
        for f in os.listdir(output_dir) 
        if f.endswith('.jpg')
    ])
    
    return frames


def extract_with_timestamps(video_path: str) -> List[dict]:
    """Extract I-frames with their timestamps."""
    
    # Get I-frame timestamps
    cmd = [
        "ffprobe", "-v", "quiet",
        "-select_streams", "v:0",
        "-show_entries", "frame=pict_type,pts_time",
        "-of", "csv=p=0",
        video_path
    ]
    
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    iframes = []
    for line in result.stdout.strip().split("\n"):
        parts = line.split(",")
        if len(parts) == 2 and parts[0] == "I":
            iframes.append({"type": "I", "timestamp": float(parts[1])})
    
    return iframes

Motion-Based Sampling

import cv2
import numpy as np
from collections import deque
from typing import Generator, Tuple

class MotionSampler:
    """Sample frames based on motion detection."""
    
    def __init__(
        self,
        motion_threshold: float = 0.02,
        min_interval: float = 0.1,
        cooldown_frames: int = 5
    ):
        self.motion_threshold = motion_threshold
        self.min_interval = min_interval
        self.cooldown_frames = cooldown_frames
        
        self.prev_frame = None
        self.frame_buffer = deque(maxlen=3)
        self.last_sample_time = 0
        self.cooldown_counter = 0
    
    def calculate_motion(self, frame: np.ndarray) -> float:
        """Calculate motion score between frames."""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        gray = cv2.GaussianBlur(gray, (21, 21), 0)
        
        if self.prev_frame is None:
            self.prev_frame = gray
            return 0.0
        
        # Frame difference
        diff = cv2.absdiff(self.prev_frame, gray)
        _, thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
        
        # Motion score = percentage of changed pixels
        motion_score = np.sum(thresh > 0) / thresh.size
        
        self.prev_frame = gray
        return motion_score
    
    def should_process(
        self, 
        frame: np.ndarray, 
        timestamp: float
    ) -> Tuple[bool, float]:
        """Determine if frame should be processed.
        
        Returns:
            (should_process, motion_score)
        """
        motion_score = self.calculate_motion(frame)
        
        # Enforce minimum interval
        if timestamp - self.last_sample_time < self.min_interval:
            return False, motion_score
        
        # Check cooldown
        if self.cooldown_counter > 0:
            self.cooldown_counter -= 1
            return False, motion_score
        
        # Check motion threshold
        if motion_score > self.motion_threshold:
            self.last_sample_time = timestamp
            self.cooldown_counter = self.cooldown_frames
            return True, motion_score
        
        return False, motion_score
    
    def process_stream(
        self, 
        capture: RTSPCapture
    ) -> Generator[Tuple[np.ndarray, float, float], None, None]:
        """Generator that yields frames when motion detected."""
        
        capture.start()
        
        while True:
            success, frame, timestamp = capture.read()
            if not success:
                continue
            
            should_process, motion_score = self.should_process(frame, timestamp)
            
            if should_process:
                yield frame, timestamp, motion_score


# Usage
sampler = MotionSampler(motion_threshold=0.03)

for frame, ts, motion in sampler.process_stream(camera):
    print(f"Motion detected: {motion:.2%}")
    results = model(frame)

Adaptive Rate Sampling

from dataclasses import dataclass
from typing import Optional
import time

@dataclass
class SamplingConfig:
    base_fps: float = 1.0
    max_fps: float = 10.0
    min_fps: float = 0.1
    activity_boost_factor: float = 2.0
    decay_rate: float = 0.9

class AdaptiveSampler:
    """Dynamically adjust frame rate based on content activity."""
    
    def __init__(self, config: SamplingConfig = None):
        self.config = config or SamplingConfig()
        self.current_fps = self.config.base_fps
        self.last_sample_time = 0
        self.activity_score = 0.0
    
    def update_activity(self, detections: int, motion: float) -> None:
        """Update activity score based on inference results."""
        # Combine detection count and motion
        new_activity = (detections * 0.5) + (motion * 10)
        
        # Exponential moving average
        self.activity_score = (
            0.7 * self.activity_score + 
            0.3 * new_activity
        )
        
        # Adjust FPS
        if self.activity_score > 1.0:
            self.current_fps = min(
                self.current_fps * self.config.activity_boost_factor,
                self.config.max_fps
            )
        else:
            self.current_fps = max(
                self.current_fps * self.config.decay_rate,
                self.config.min_fps
            )
    
    def should_sample(self) -> bool:
        """Check if we should sample based on current FPS."""
        current_time = time.time()
        interval = 1.0 / self.current_fps
        
        if current_time - self.last_sample_time >= interval:
            self.last_sample_time = current_time
            return True
        
        return False
    
    def get_stats(self) -> dict:
        return {
            "current_fps": round(self.current_fps, 2),
            "activity_score": round(self.activity_score, 2),
            "sample_interval_ms": round(1000 / self.current_fps, 1)
        }

34.1.5. Latency Comparison

ProtocolTypical LatencyReliabilityUse Case
RTSP/TCP1-3sHighRecording, analytics
RTSP/UDP500ms-1sMediumLower latency streaming
HLS6-30sVery HighBroadcast, CDN distribution
DASH3-20sVery HighAdaptive bitrate streaming
WebRTC100-500msMediumReal-time interaction
Direct UDP50-200msLowRobot control, gaming

Latency Breakdown

gantt
    title Video Pipeline Latency Breakdown
    dateFormat X
    axisFormat %L ms
    
    section Capture
    Camera encode    :0, 30
    Network transfer :30, 80
    
    section Ingest
    RTSP parse       :80, 90
    Buffer/sync      :90, 120
    
    section Decode
    H264 decode      :120, 150
    Format convert   :150, 160
    
    section ML
    Preprocess       :160, 170
    Inference        :170, 220
    Post-process     :220, 235
    
    section Output
    Result publish   :235, 245

Measuring End-to-End Latency

import time
import cv2
import numpy as np
from dataclasses import dataclass, field
from typing import List
from statistics import mean, stdev

@dataclass
class LatencyMeasurement:
    capture_time: float
    decode_time: float
    preprocess_time: float
    inference_time: float
    postprocess_time: float
    
    @property
    def total(self) -> float:
        return (
            self.decode_time + 
            self.preprocess_time + 
            self.inference_time + 
            self.postprocess_time
        )

class LatencyTracker:
    """Track detailed latency metrics for video pipeline."""
    
    def __init__(self, window_size: int = 100):
        self.measurements: List[LatencyMeasurement] = []
        self.window_size = window_size
    
    def record(self, measurement: LatencyMeasurement) -> None:
        self.measurements.append(measurement)
        if len(self.measurements) > self.window_size:
            self.measurements.pop(0)
    
    def get_stats(self) -> dict:
        if not self.measurements:
            return {}
        
        def calc_stats(values: List[float]) -> dict:
            return {
                "mean": round(mean(values) * 1000, 2),
                "std": round(stdev(values) * 1000, 2) if len(values) > 1 else 0,
                "min": round(min(values) * 1000, 2),
                "max": round(max(values) * 1000, 2)
            }
        
        return {
            "decode_ms": calc_stats([m.decode_time for m in self.measurements]),
            "preprocess_ms": calc_stats([m.preprocess_time for m in self.measurements]),
            "inference_ms": calc_stats([m.inference_time for m in self.measurements]),
            "postprocess_ms": calc_stats([m.postprocess_time for m in self.measurements]),
            "total_ms": calc_stats([m.total for m in self.measurements]),
            "sample_count": len(self.measurements)
        }


def benchmark_pipeline(
    capture: RTSPCapture, 
    model, 
    num_frames: int = 100
) -> dict:
    """Benchmark full pipeline latency."""
    tracker = LatencyTracker()
    
    capture.start()
    processed = 0
    
    while processed < num_frames:
        success, frame, capture_time = capture.read()
        if not success:
            continue
        
        # Decode (already done by GStreamer, measure overhead)
        t0 = time.perf_counter()
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        t1 = time.perf_counter()
        
        # Preprocess
        preprocessed = model.preprocess(frame_rgb)
        t2 = time.perf_counter()
        
        # Inference
        outputs = model.infer(preprocessed)
        t3 = time.perf_counter()
        
        # Postprocess
        results = model.postprocess(outputs)
        t4 = time.perf_counter()
        
        tracker.record(LatencyMeasurement(
            capture_time=capture_time,
            decode_time=t1 - t0,
            preprocess_time=t2 - t1,
            inference_time=t3 - t2,
            postprocess_time=t4 - t3
        ))
        
        processed += 1
    
    capture.stop()
    return tracker.get_stats()

34.1.6. WebRTC for Low Latency

When sub-second latency is critical, WebRTC is the answer.

import asyncio
from aiortc import RTCPeerConnection, VideoStreamTrack, RTCSessionDescription
from aiortc.contrib.media import MediaBlackhole
from av import VideoFrame
import numpy as np

class MLVideoTrack(VideoStreamTrack):
    """Process video with ML and forward results."""
    
    kind = "video"
    
    def __init__(self, source_track, model):
        super().__init__()
        self.source = source_track
        self.model = model
        self.frame_count = 0
    
    async def recv(self) -> VideoFrame:
        frame = await self.source.recv()
        
        # Convert to numpy
        img = frame.to_ndarray(format="bgr24")
        
        # Run inference (should be async in production)
        loop = asyncio.get_event_loop()
        results = await loop.run_in_executor(None, self.model, img)
        
        # Draw results
        annotated = self.draw_detections(img, results)
        
        # Convert back to frame
        new_frame = VideoFrame.from_ndarray(annotated, format="bgr24")
        new_frame.pts = frame.pts
        new_frame.time_base = frame.time_base
        
        self.frame_count += 1
        return new_frame
    
    def draw_detections(self, image: np.ndarray, results) -> np.ndarray:
        """Draw detection boxes on image."""
        import cv2
        
        for box in results.boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            conf = box.conf[0]
            cls = int(box.cls[0])
            
            cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(
                image, 
                f"{cls}: {conf:.2f}",
                (x1, y1 - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                (0, 255, 0),
                2
            )
        
        return image


class WebRTCMLServer:
    """WebRTC server with ML processing."""
    
    def __init__(self, model):
        self.model = model
        self.peers = {}
    
    async def handle_offer(self, offer_sdp: str, peer_id: str) -> str:
        """Handle WebRTC offer and return answer."""
        pc = RTCPeerConnection()
        self.peers[peer_id] = pc
        
        @pc.on("track")
        async def on_track(track):
            if track.kind == "video":
                # Wrap with ML processing
                ml_track = MLVideoTrack(track, self.model)
                pc.addTrack(ml_track)
        
        @pc.on("connectionstatechange")
        async def on_connection_state_change():
            if pc.connectionState == "closed":
                del self.peers[peer_id]
        
        # Set remote description
        await pc.setRemoteDescription(
            RTCSessionDescription(sdp=offer_sdp, type="offer")
        )
        
        # Create answer
        answer = await pc.createAnswer()
        await pc.setLocalDescription(answer)
        
        return pc.localDescription.sdp

34.1.7. Edge vs Cloud Decision

FactorEdgeCloud
Bandwidth costLowHigh ($0.01-0.09/GB)
GPU availabilityLimited (INT8)Unlimited (FP32)
Maximum latency<100ms>500ms
Model sizeSmall (<100MB)Large (multi-GB)
Update complexityComplex (OTA)Easy (container deploy)
PrivacyHigh (data stays local)Requires consent
ReliabilityWorks offlineRequires connectivity

Cascade Pattern

Filter at edge, analyze in cloud:

graph TB
    A[Camera 30fps] --> B[Edge: Motion Detect]
    B -->|No Motion| C[Drop 95% frames]
    B -->|Motion| D[Edge: Person Detect]
    D -->|No Person| C
    D -->|Person 0.5fps| E[Cloud: Face Recognition]
    E --> F[Alert System]
    
    subgraph "Edge Device"
        B
        D
    end
    
    subgraph "Cloud"
        E
        F
    end

Edge Inference Implementation

import onnxruntime as ort
import numpy as np
from typing import List, Tuple

class EdgeInferenceEngine:
    """Optimized inference for edge devices."""
    
    def __init__(
        self, 
        model_path: str,
        quantized: bool = True,
        num_threads: int = 4
    ):
        # Configure for edge
        options = ort.SessionOptions()
        options.intra_op_num_threads = num_threads
        options.inter_op_num_threads = 1
        options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        
        providers = ["CPUExecutionProvider"]
        
        # Try GPU providers
        if ort.get_device() == "GPU":
            providers = ["CUDAExecutionProvider"] + providers
        
        self.session = ort.InferenceSession(
            model_path, 
            options, 
            providers=providers
        )
        
        # Get input details
        self.input_name = self.session.get_inputs()[0].name
        self.input_shape = self.session.get_inputs()[0].shape
    
    def preprocess(self, image: np.ndarray) -> np.ndarray:
        """Preprocess image for model input."""
        # Resize
        target_size = (self.input_shape[3], self.input_shape[2])
        resized = cv2.resize(image, target_size)
        
        # Normalize and transpose
        normalized = resized.astype(np.float32) / 255.0
        transposed = np.transpose(normalized, (2, 0, 1))
        batched = np.expand_dims(transposed, 0)
        
        return batched
    
    def infer(self, preprocessed: np.ndarray) -> np.ndarray:
        """Run inference."""
        outputs = self.session.run(None, {self.input_name: preprocessed})
        return outputs[0]
    
    def postprocess(
        self, 
        outputs: np.ndarray, 
        conf_threshold: float = 0.5
    ) -> List[dict]:
        """Postprocess model outputs to detections."""
        detections = []
        
        for detection in outputs[0]:
            conf = detection[4]
            if conf < conf_threshold:
                continue
            
            x1, y1, x2, y2 = detection[:4]
            class_probs = detection[5:]
            class_id = np.argmax(class_probs)
            
            detections.append({
                "bbox": [float(x1), float(y1), float(x2), float(y2)],
                "confidence": float(conf),
                "class_id": int(class_id)
            })
        
        return detections


# Cascade filter
class CascadeFilter:
    """Two-stage cascade: motion → detection."""
    
    def __init__(self, detector_model_path: str):
        self.motion_sampler = MotionSampler(motion_threshold=0.02)
        self.detector = EdgeInferenceEngine(detector_model_path)
        self.person_class_id = 0  # COCO person class
    
    def should_upload(self, frame: np.ndarray, timestamp: float) -> Tuple[bool, dict]:
        """Determine if frame should be sent to cloud."""
        
        # Stage 1: Motion detection (CPU, ~1ms)
        has_motion, motion_score = self.motion_sampler.should_process(frame, timestamp)
        
        if not has_motion:
            return False, {"reason": "no_motion", "motion_score": motion_score}
        
        # Stage 2: Person detection (GPU/NPU, ~20ms)
        preprocessed = self.detector.preprocess(frame)
        outputs = self.detector.infer(preprocessed)
        detections = self.detector.postprocess(outputs, conf_threshold=0.5)
        
        persons = [d for d in detections if d["class_id"] == self.person_class_id]
        
        if not persons:
            return False, {"reason": "no_person", "detections": len(detections)}
        
        return True, {
            "reason": "person_detected",
            "person_count": len(persons),
            "motion_score": motion_score
        }

34.1.8. Hardware Comparison

DeviceTOPSPowerPriceUse Case
Coral USB TPU42W$60Counting, classification
Coral Dev Board42W$130Standalone edge device
Jetson Nano40 (FP16)10W$200Entry-level detection
Jetson Orin Nano4015W$500Detection + tracking
Jetson Orin NX10025W$900Multi-camera pipeline
Jetson AGX Orin27560W$2000Full pipeline, complex models
Intel NUC + Arc200100W$1000Server-grade edge
Hailo-8263W$100Low-power inference

NVIDIA Jetson Deployment

# Dockerfile.jetson

FROM nvcr.io/nvidia/l4t-ml:r35.2.1-py3

WORKDIR /app

# Install dependencies
RUN pip install --no-cache-dir \
    opencv-python-headless \
    onnxruntime-gpu \
    pyyaml \
    redis

# Copy model (should be TensorRT optimized)
COPY models/yolov8n.engine /app/models/

# Copy application
COPY src/ /app/src/

ENV MODEL_PATH=/app/models/yolov8n.engine
ENV RTSP_URL=rtsp://camera:554/stream

CMD ["python", "src/main.py"]

TensorRT Optimization

import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np

class TensorRTInference:
    """Optimized inference using TensorRT."""
    
    def __init__(self, engine_path: str):
        self.logger = trt.Logger(trt.Logger.WARNING)
        
        # Load engine
        with open(engine_path, "rb") as f:
            self.engine = trt.Runtime(self.logger).deserialize_cuda_engine(f.read())
        
        self.context = self.engine.create_execution_context()
        
        # Allocate buffers
        self.inputs = []
        self.outputs = []
        self.bindings = []
        
        for binding in self.engine:
            size = trt.volume(self.engine.get_binding_shape(binding))
            dtype = trt.nptype(self.engine.get_binding_dtype(binding))
            
            # Allocate host and device buffers
            host_mem = cuda.pagelocked_empty(size, dtype)
            device_mem = cuda.mem_alloc(host_mem.nbytes)
            
            self.bindings.append(int(device_mem))
            
            if self.engine.binding_is_input(binding):
                self.inputs.append({"host": host_mem, "device": device_mem})
            else:
                self.outputs.append({"host": host_mem, "device": device_mem})
    
    def infer(self, input_data: np.ndarray) -> np.ndarray:
        """Run inference."""
        # Copy input to device
        np.copyto(self.inputs[0]["host"], input_data.ravel())
        cuda.memcpy_htod(self.inputs[0]["device"], self.inputs[0]["host"])
        
        # Execute
        self.context.execute_v2(self.bindings)
        
        # Copy output to host
        cuda.memcpy_dtoh(self.outputs[0]["host"], self.outputs[0]["device"])
        
        return self.outputs[0]["host"]

34.1.9. Monitoring & Observability

import time
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from dataclasses import dataclass

# Metrics
FRAMES_PROCESSED = Counter(
    "video_frames_processed_total",
    "Total frames processed",
    ["camera_id", "pipeline"]
)

PROCESSING_LATENCY = Histogram(
    "video_processing_latency_seconds",
    "Frame processing latency",
    ["camera_id", "stage"],
    buckets=[.01, .025, .05, .1, .25, .5, 1.0]
)

DETECTIONS = Counter(
    "video_detections_total",
    "Total object detections",
    ["camera_id", "class_name"]
)

PIPELINE_FPS = Gauge(
    "video_pipeline_fps",
    "Current pipeline FPS",
    ["camera_id"]
)

CAMERA_STATUS = Gauge(
    "video_camera_status",
    "Camera connection status (1=connected, 0=disconnected)",
    ["camera_id"]
)


class MetricsCollector:
    """Collect and export video pipeline metrics."""
    
    def __init__(self, port: int = 9090):
        start_http_server(port)
        self.last_frame_time = {}
    
    def record_frame(
        self, 
        camera_id: str,
        latencies: dict,
        detections: list
    ) -> None:
        """Record metrics for a processed frame."""
        FRAMES_PROCESSED.labels(camera_id=camera_id, pipeline="main").inc()
        
        for stage, latency in latencies.items():
            PROCESSING_LATENCY.labels(
                camera_id=camera_id, 
                stage=stage
            ).observe(latency)
        
        for det in detections:
            DETECTIONS.labels(
                camera_id=camera_id,
                class_name=det["class_name"]
            ).inc()
        
        # Calculate FPS
        now = time.time()
        if camera_id in self.last_frame_time:
            fps = 1.0 / (now - self.last_frame_time[camera_id])
            PIPELINE_FPS.labels(camera_id=camera_id).set(fps)
        self.last_frame_time[camera_id] = now
    
    def set_camera_status(self, camera_id: str, connected: bool) -> None:
        CAMERA_STATUS.labels(camera_id=camera_id).set(1 if connected else 0)

34.1.10. Troubleshooting Guide

ProblemSymptomsCauseSolution
Frames droppingGaps in videoBuffer overflowIncrease buffer, reduce FPS
High latency>2s delayBuffering too aggressiveUse latency=0, drop=true
Color artifactsGreen/pink framesYUV conversion errorVerify videoconvert in pipeline
Memory leakRAM grows over timeFrame references heldUse max-buffers=1 drop=true
Connection lostPeriodic disconnectsNetwork instabilityAdd reconnection logic
GPU not usedHigh CPU, slowWrong decoderCheck nvdec availability
Wrong timestampsPTS driftClock skewUse camera NTP sync

Debug Pipeline

# Test GStreamer pipeline
gst-launch-1.0 -v \
    rtspsrc location=rtsp://camera:554/stream latency=0 \
    ! rtph264depay ! h264parse ! avdec_h264 \
    ! videoconvert ! autovideosink

# Check NVIDIA decoder
gst-inspect-1.0 nvdec

# Monitor frame drops
GST_DEBUG=2 python your_script.py 2>&1 | grep -i drop

34.1.11. Summary Checklist

StepActionPriority
1Use GStreamer backend for RTSPCritical
2Implement reconnection logicCritical
3Buffer with KVS for cloud analyticsHigh
4Sample frames strategically (motion/I-frame)High
5Use PTS timestamps for syncHigh
6Consider edge inference for latencyMedium
7Convert to TensorRT for GPU edgeMedium
8Set up Prometheus metricsMedium
9Test cascade filtering ratiosMedium
10Document camera configurationsLow

[End of Section 34.1]