34.1. Video Stream Processing: RTSP, Kinesis & GStreamer
Note
The High-Bandwidth Challenge: A single 1080p 30fps stream is ~5 Mbps. A thousand cameras is 5 Gbps. Your “CSV” MLOps stack will melt.
34.1.1. The Video Pipeline Anatomy
graph LR
A[IP Camera] -->|RTSP| B[GStreamer Ingest]
B -->|MKV| C[Kinesis Video]
C --> D[Decoder]
D -->|RGB| E[ML Inference]
E --> F[Analytics]
| Stage | Function | Bottleneck | Typical Latency |
|---|---|---|---|
| Ingest | RTSP capture | Network stability | 50-200ms |
| Transport | Cloud buffering | Bandwidth cost | 100-500ms |
| Decode | H264 → RGB | CPU/GPU cycles | 10-50ms |
| Inference | Object detection | GPU memory | 20-100ms |
| Post-process | Tracking, alerts | CPU | 5-20ms |
Video ML vs Traditional ML
| Dimension | Traditional ML | Video ML |
|---|---|---|
| Data rate | GB/day | TB/hour |
| Latency tolerance | Seconds-minutes | Milliseconds |
| Processing | Batch | Streaming |
| Infrastructure | CPU clusters | GPU + specialized decoders |
| Cost driver | Compute | Bandwidth + storage |
34.1.2. GStreamer for RTSP Ingestion
GStreamer is the gold standard for video capture. OpenCV’s VideoCapture falls apart under real-world conditions.
Basic RTSP Capture
import cv2
import numpy as np
from typing import Optional, Tuple
import threading
import queue
import time
class RTSPCapture:
"""Robust RTSP capture using GStreamer backend."""
def __init__(
self,
rtsp_url: str,
use_gpu: bool = False,
buffer_size: int = 1,
reconnect_attempts: int = 5
):
self.rtsp_url = rtsp_url
self.use_gpu = use_gpu
self.buffer_size = buffer_size
self.reconnect_attempts = reconnect_attempts
self.pipeline = self._build_pipeline()
self.cap = None
self._connect()
# Threading for non-blocking reads
self.frame_queue = queue.Queue(maxsize=buffer_size)
self.running = False
self._thread = None
def _build_pipeline(self) -> str:
"""Build GStreamer pipeline string."""
decoder = "nvdec" if self.use_gpu else "avdec_h264"
pipeline = (
f"rtspsrc location={self.rtsp_url} latency=0 "
f"protocols=tcp drop-on-latency=true ! "
f"rtph264depay ! h264parse ! {decoder} ! "
f"videoconvert ! video/x-raw,format=BGR ! "
f"appsink max-buffers=1 drop=true sync=false"
)
return pipeline
def _connect(self) -> bool:
"""Attempt to connect to RTSP stream."""
for attempt in range(self.reconnect_attempts):
self.cap = cv2.VideoCapture(self.pipeline, cv2.CAP_GSTREAMER)
if self.cap.isOpened():
print(f"Connected to {self.rtsp_url}")
return True
print(f"Connection attempt {attempt + 1} failed, retrying...")
time.sleep(2 ** attempt) # Exponential backoff
raise ConnectionError(f"Failed to connect to {self.rtsp_url}")
def start(self) -> None:
"""Start background capture thread."""
self.running = True
self._thread = threading.Thread(target=self._capture_loop, daemon=True)
self._thread.start()
def _capture_loop(self) -> None:
"""Background thread for continuous capture."""
consecutive_failures = 0
max_failures = 10
while self.running:
ret, frame = self.cap.read()
if not ret:
consecutive_failures += 1
if consecutive_failures >= max_failures:
print("Connection lost, attempting reconnect...")
try:
self._connect()
consecutive_failures = 0
except ConnectionError:
print("Reconnection failed")
break
continue
consecutive_failures = 0
# Drop old frames if queue is full
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
except queue.Empty:
pass
self.frame_queue.put((time.time(), frame))
def read(self, timeout: float = 1.0) -> Tuple[bool, Optional[np.ndarray], float]:
"""Read frame with timeout.
Returns:
(success, frame, timestamp)
"""
try:
timestamp, frame = self.frame_queue.get(timeout=timeout)
return True, frame, timestamp
except queue.Empty:
return False, None, 0.0
def stop(self) -> None:
"""Stop capture and release resources."""
self.running = False
if self._thread:
self._thread.join(timeout=2.0)
if self.cap:
self.cap.release()
# Usage
cap = RTSPCapture(
"rtsp://192.168.1.50:554/stream1",
use_gpu=True,
buffer_size=1
)
cap.start()
while True:
success, frame, ts = cap.read()
if not success:
continue
# Run inference
results = model(frame)
# Calculate end-to-end latency
e2e_latency = time.time() - ts
print(f"E2E latency: {e2e_latency*1000:.1f}ms")
Multi-Camera Manager
from dataclasses import dataclass
from typing import Dict, List, Callable
import concurrent.futures
import threading
@dataclass
class CameraConfig:
camera_id: str
rtsp_url: str
zone: str
use_gpu: bool = True
priority: int = 1 # 1=high, 2=medium, 3=low
class MultiCameraManager:
"""Manage multiple RTSP streams with resource allocation."""
def __init__(
self,
configs: List[CameraConfig],
max_concurrent: int = 10
):
self.configs = {c.camera_id: c for c in configs}
self.captures: Dict[str, RTSPCapture] = {}
self.max_concurrent = max_concurrent
self.executor = concurrent.futures.ThreadPoolExecutor(max_concurrent)
self._lock = threading.Lock()
def start_all(self) -> None:
"""Start all camera captures."""
# Sort by priority
sorted_configs = sorted(
self.configs.values(),
key=lambda c: c.priority
)
for config in sorted_configs:
self._start_camera(config)
def _start_camera(self, config: CameraConfig) -> None:
"""Start individual camera capture."""
try:
cap = RTSPCapture(
config.rtsp_url,
use_gpu=config.use_gpu
)
cap.start()
with self._lock:
self.captures[config.camera_id] = cap
print(f"Started camera: {config.camera_id}")
except ConnectionError as e:
print(f"Failed to start camera {config.camera_id}: {e}")
def process_all(
self,
inference_fn: Callable[[np.ndarray], dict],
callback: Callable[[str, dict], None]
) -> None:
"""Process all camera feeds with inference function."""
def process_camera(camera_id: str) -> None:
cap = self.captures.get(camera_id)
if not cap:
return
success, frame, ts = cap.read(timeout=0.1)
if not success:
return
results = inference_fn(frame)
results["camera_id"] = camera_id
results["timestamp"] = ts
callback(camera_id, results)
# Submit all cameras for processing
futures = [
self.executor.submit(process_camera, cam_id)
for cam_id in self.captures.keys()
]
# Wait for completion
concurrent.futures.wait(futures, timeout=1.0)
def get_stats(self) -> dict:
"""Get statistics for all cameras."""
return {
"total_cameras": len(self.configs),
"active_cameras": len(self.captures),
"camera_status": {
cam_id: "active" if cam_id in self.captures else "disconnected"
for cam_id in self.configs.keys()
}
}
def stop_all(self) -> None:
"""Stop all cameras."""
for cap in self.captures.values():
cap.stop()
self.captures.clear()
self.executor.shutdown(wait=True)
34.1.3. AWS Kinesis Video Streams
For cloud-scale video ingestion, Kinesis Video Streams provides durability and integration.
Terraform Infrastructure
# kinesis_video.tf
variable "cameras" {
type = map(object({
device_name = string
zone = string
retention_hours = number
}))
}
resource "aws_kinesis_video_stream" "camera" {
for_each = var.cameras
name = "camera-${each.key}"
data_retention_in_hours = each.value.retention_hours
device_name = each.value.device_name
media_type = "video/h264"
tags = {
Environment = var.environment
Zone = each.value.zone
ManagedBy = "terraform"
}
}
resource "aws_iam_role" "kvs_producer" {
name = "kvs-producer-${var.environment}"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "kinesisvideo.amazonaws.com" }
}]
})
}
resource "aws_iam_role_policy" "kvs_producer_policy" {
name = "kvs-producer-policy"
role = aws_iam_role.kvs_producer.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"kinesisvideo:PutMedia",
"kinesisvideo:GetDataEndpoint",
"kinesisvideo:DescribeStream"
]
Resource = [for s in aws_kinesis_video_stream.camera : s.arn]
}
]
})
}
# Lambda for ML processing
resource "aws_lambda_function" "frame_processor" {
function_name = "kvs-frame-processor-${var.environment}"
role = aws_iam_role.lambda_processor.arn
handler = "handler.process_frame"
runtime = "python3.11"
memory_size = 1024
timeout = 30
# Use container image for ML dependencies
package_type = "Image"
image_uri = "${aws_ecr_repository.ml_processor.repository_url}:latest"
environment {
variables = {
MODEL_PATH = "s3://${var.model_bucket}/yolov8n.onnx"
}
}
vpc_config {
subnet_ids = var.subnet_ids
security_group_ids = [aws_security_group.lambda.id]
}
}
# Connect KVS to Lambda
resource "aws_lambda_event_source_mapping" "kvs_trigger" {
for_each = aws_kinesis_video_stream.camera
event_source_arn = each.value.arn
function_name = aws_lambda_function.frame_processor.arn
starting_position = "LATEST"
batch_size = 1
}
Consuming from Kinesis Video
import boto3
from amazon_kinesis_video_consumer_library.kinesis_video_streams_parser import (
KinesisVideoStreamsParser,
)
import numpy as np
import av
from io import BytesIO
class KVSConsumer:
"""Consume frames from Kinesis Video Streams."""
def __init__(self, stream_name: str, region: str = "us-east-1"):
self.stream_name = stream_name
self.region = region
self.kvs_client = boto3.client("kinesisvideo", region_name=region)
def get_media_endpoint(self) -> str:
"""Get the media endpoint for the stream."""
response = self.kvs_client.get_data_endpoint(
StreamName=self.stream_name,
APIName="GET_MEDIA"
)
return response["DataEndpoint"]
def get_frames(self, start_selector: dict = None):
"""Generator that yields frames from the stream."""
endpoint = self.get_media_endpoint()
kvs_media = boto3.client(
"kinesis-video-media",
endpoint_url=endpoint,
region_name=self.region
)
if start_selector is None:
start_selector = {"StartSelectorType": "NOW"}
response = kvs_media.get_media(
StreamName=self.stream_name,
StartSelector=start_selector
)
# Parse MKV stream
parser = KinesisVideoStreamsParser()
for chunk in response["Payload"].iter_chunks():
for fragment in parser.parse(chunk):
for frame in self._decode_fragment(fragment):
yield frame
def _decode_fragment(self, fragment: bytes) -> list:
"""Decode MKV fragment to RGB frames."""
frames = []
container = av.open(BytesIO(fragment))
for frame in container.decode(video=0):
img = frame.to_ndarray(format="bgr24")
frames.append({
"image": img,
"pts": frame.pts,
"timestamp": frame.time
})
return frames
# Usage with inference
def process_stream(stream_name: str, model):
"""Process KVS stream with ML model."""
consumer = KVSConsumer(stream_name)
for frame_data in consumer.get_frames():
image = frame_data["image"]
timestamp = frame_data["timestamp"]
# Run inference
results = model(image)
# Process detections
for detection in results.boxes:
print(f"[{timestamp}] Detected: {detection.cls} at {detection.xyxy}")
34.1.4. Frame Sampling Strategy
Processing every frame is wasteful. Smart sampling reduces compute by 10-100x.
| Strategy | When to Use | Compute Savings | Accuracy Impact |
|---|---|---|---|
| Every N frames | Uniform sampling | N× | Low (if N≤10) |
| I-Frames only | Low-motion scenes | 30× | Medium |
| Motion-triggered | Security cameras | 50-100× | Very low |
| Scene change | Content analysis | Variable | Low |
| Adaptive rate | Mixed content | 10-50× | Very low |
I-Frame Extraction
import subprocess
import os
from pathlib import Path
from typing import List, Optional
import tempfile
def extract_iframes(
video_path: str,
output_dir: Optional[str] = None,
quality: int = 2 # 1-31, lower is better
) -> List[str]:
"""Extract I-frames only for efficient processing.
Args:
video_path: Path to input video
output_dir: Directory for output frames (temp if None)
quality: JPEG quality (1=best, 31=worst)
Returns:
List of frame file paths
"""
if output_dir is None:
output_dir = tempfile.mkdtemp(prefix="iframes_")
os.makedirs(output_dir, exist_ok=True)
cmd = [
"ffmpeg", "-i", video_path,
"-vf", "select='eq(pict_type,PICT_TYPE_I)'",
"-vsync", "vfr",
"-q:v", str(quality),
f"{output_dir}/frame_%06d.jpg"
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg failed: {result.stderr}")
frames = sorted([
os.path.join(output_dir, f)
for f in os.listdir(output_dir)
if f.endswith('.jpg')
])
return frames
def extract_with_timestamps(video_path: str) -> List[dict]:
"""Extract I-frames with their timestamps."""
# Get I-frame timestamps
cmd = [
"ffprobe", "-v", "quiet",
"-select_streams", "v:0",
"-show_entries", "frame=pict_type,pts_time",
"-of", "csv=p=0",
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
iframes = []
for line in result.stdout.strip().split("\n"):
parts = line.split(",")
if len(parts) == 2 and parts[0] == "I":
iframes.append({"type": "I", "timestamp": float(parts[1])})
return iframes
Motion-Based Sampling
import cv2
import numpy as np
from collections import deque
from typing import Generator, Tuple
class MotionSampler:
"""Sample frames based on motion detection."""
def __init__(
self,
motion_threshold: float = 0.02,
min_interval: float = 0.1,
cooldown_frames: int = 5
):
self.motion_threshold = motion_threshold
self.min_interval = min_interval
self.cooldown_frames = cooldown_frames
self.prev_frame = None
self.frame_buffer = deque(maxlen=3)
self.last_sample_time = 0
self.cooldown_counter = 0
def calculate_motion(self, frame: np.ndarray) -> float:
"""Calculate motion score between frames."""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
if self.prev_frame is None:
self.prev_frame = gray
return 0.0
# Frame difference
diff = cv2.absdiff(self.prev_frame, gray)
_, thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
# Motion score = percentage of changed pixels
motion_score = np.sum(thresh > 0) / thresh.size
self.prev_frame = gray
return motion_score
def should_process(
self,
frame: np.ndarray,
timestamp: float
) -> Tuple[bool, float]:
"""Determine if frame should be processed.
Returns:
(should_process, motion_score)
"""
motion_score = self.calculate_motion(frame)
# Enforce minimum interval
if timestamp - self.last_sample_time < self.min_interval:
return False, motion_score
# Check cooldown
if self.cooldown_counter > 0:
self.cooldown_counter -= 1
return False, motion_score
# Check motion threshold
if motion_score > self.motion_threshold:
self.last_sample_time = timestamp
self.cooldown_counter = self.cooldown_frames
return True, motion_score
return False, motion_score
def process_stream(
self,
capture: RTSPCapture
) -> Generator[Tuple[np.ndarray, float, float], None, None]:
"""Generator that yields frames when motion detected."""
capture.start()
while True:
success, frame, timestamp = capture.read()
if not success:
continue
should_process, motion_score = self.should_process(frame, timestamp)
if should_process:
yield frame, timestamp, motion_score
# Usage
sampler = MotionSampler(motion_threshold=0.03)
for frame, ts, motion in sampler.process_stream(camera):
print(f"Motion detected: {motion:.2%}")
results = model(frame)
Adaptive Rate Sampling
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class SamplingConfig:
base_fps: float = 1.0
max_fps: float = 10.0
min_fps: float = 0.1
activity_boost_factor: float = 2.0
decay_rate: float = 0.9
class AdaptiveSampler:
"""Dynamically adjust frame rate based on content activity."""
def __init__(self, config: SamplingConfig = None):
self.config = config or SamplingConfig()
self.current_fps = self.config.base_fps
self.last_sample_time = 0
self.activity_score = 0.0
def update_activity(self, detections: int, motion: float) -> None:
"""Update activity score based on inference results."""
# Combine detection count and motion
new_activity = (detections * 0.5) + (motion * 10)
# Exponential moving average
self.activity_score = (
0.7 * self.activity_score +
0.3 * new_activity
)
# Adjust FPS
if self.activity_score > 1.0:
self.current_fps = min(
self.current_fps * self.config.activity_boost_factor,
self.config.max_fps
)
else:
self.current_fps = max(
self.current_fps * self.config.decay_rate,
self.config.min_fps
)
def should_sample(self) -> bool:
"""Check if we should sample based on current FPS."""
current_time = time.time()
interval = 1.0 / self.current_fps
if current_time - self.last_sample_time >= interval:
self.last_sample_time = current_time
return True
return False
def get_stats(self) -> dict:
return {
"current_fps": round(self.current_fps, 2),
"activity_score": round(self.activity_score, 2),
"sample_interval_ms": round(1000 / self.current_fps, 1)
}
34.1.5. Latency Comparison
| Protocol | Typical Latency | Reliability | Use Case |
|---|---|---|---|
| RTSP/TCP | 1-3s | High | Recording, analytics |
| RTSP/UDP | 500ms-1s | Medium | Lower latency streaming |
| HLS | 6-30s | Very High | Broadcast, CDN distribution |
| DASH | 3-20s | Very High | Adaptive bitrate streaming |
| WebRTC | 100-500ms | Medium | Real-time interaction |
| Direct UDP | 50-200ms | Low | Robot control, gaming |
Latency Breakdown
gantt
title Video Pipeline Latency Breakdown
dateFormat X
axisFormat %L ms
section Capture
Camera encode :0, 30
Network transfer :30, 80
section Ingest
RTSP parse :80, 90
Buffer/sync :90, 120
section Decode
H264 decode :120, 150
Format convert :150, 160
section ML
Preprocess :160, 170
Inference :170, 220
Post-process :220, 235
section Output
Result publish :235, 245
Measuring End-to-End Latency
import time
import cv2
import numpy as np
from dataclasses import dataclass, field
from typing import List
from statistics import mean, stdev
@dataclass
class LatencyMeasurement:
capture_time: float
decode_time: float
preprocess_time: float
inference_time: float
postprocess_time: float
@property
def total(self) -> float:
return (
self.decode_time +
self.preprocess_time +
self.inference_time +
self.postprocess_time
)
class LatencyTracker:
"""Track detailed latency metrics for video pipeline."""
def __init__(self, window_size: int = 100):
self.measurements: List[LatencyMeasurement] = []
self.window_size = window_size
def record(self, measurement: LatencyMeasurement) -> None:
self.measurements.append(measurement)
if len(self.measurements) > self.window_size:
self.measurements.pop(0)
def get_stats(self) -> dict:
if not self.measurements:
return {}
def calc_stats(values: List[float]) -> dict:
return {
"mean": round(mean(values) * 1000, 2),
"std": round(stdev(values) * 1000, 2) if len(values) > 1 else 0,
"min": round(min(values) * 1000, 2),
"max": round(max(values) * 1000, 2)
}
return {
"decode_ms": calc_stats([m.decode_time for m in self.measurements]),
"preprocess_ms": calc_stats([m.preprocess_time for m in self.measurements]),
"inference_ms": calc_stats([m.inference_time for m in self.measurements]),
"postprocess_ms": calc_stats([m.postprocess_time for m in self.measurements]),
"total_ms": calc_stats([m.total for m in self.measurements]),
"sample_count": len(self.measurements)
}
def benchmark_pipeline(
capture: RTSPCapture,
model,
num_frames: int = 100
) -> dict:
"""Benchmark full pipeline latency."""
tracker = LatencyTracker()
capture.start()
processed = 0
while processed < num_frames:
success, frame, capture_time = capture.read()
if not success:
continue
# Decode (already done by GStreamer, measure overhead)
t0 = time.perf_counter()
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
t1 = time.perf_counter()
# Preprocess
preprocessed = model.preprocess(frame_rgb)
t2 = time.perf_counter()
# Inference
outputs = model.infer(preprocessed)
t3 = time.perf_counter()
# Postprocess
results = model.postprocess(outputs)
t4 = time.perf_counter()
tracker.record(LatencyMeasurement(
capture_time=capture_time,
decode_time=t1 - t0,
preprocess_time=t2 - t1,
inference_time=t3 - t2,
postprocess_time=t4 - t3
))
processed += 1
capture.stop()
return tracker.get_stats()
34.1.6. WebRTC for Low Latency
When sub-second latency is critical, WebRTC is the answer.
import asyncio
from aiortc import RTCPeerConnection, VideoStreamTrack, RTCSessionDescription
from aiortc.contrib.media import MediaBlackhole
from av import VideoFrame
import numpy as np
class MLVideoTrack(VideoStreamTrack):
"""Process video with ML and forward results."""
kind = "video"
def __init__(self, source_track, model):
super().__init__()
self.source = source_track
self.model = model
self.frame_count = 0
async def recv(self) -> VideoFrame:
frame = await self.source.recv()
# Convert to numpy
img = frame.to_ndarray(format="bgr24")
# Run inference (should be async in production)
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(None, self.model, img)
# Draw results
annotated = self.draw_detections(img, results)
# Convert back to frame
new_frame = VideoFrame.from_ndarray(annotated, format="bgr24")
new_frame.pts = frame.pts
new_frame.time_base = frame.time_base
self.frame_count += 1
return new_frame
def draw_detections(self, image: np.ndarray, results) -> np.ndarray:
"""Draw detection boxes on image."""
import cv2
for box in results.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = box.conf[0]
cls = int(box.cls[0])
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(
image,
f"{cls}: {conf:.2f}",
(x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
2
)
return image
class WebRTCMLServer:
"""WebRTC server with ML processing."""
def __init__(self, model):
self.model = model
self.peers = {}
async def handle_offer(self, offer_sdp: str, peer_id: str) -> str:
"""Handle WebRTC offer and return answer."""
pc = RTCPeerConnection()
self.peers[peer_id] = pc
@pc.on("track")
async def on_track(track):
if track.kind == "video":
# Wrap with ML processing
ml_track = MLVideoTrack(track, self.model)
pc.addTrack(ml_track)
@pc.on("connectionstatechange")
async def on_connection_state_change():
if pc.connectionState == "closed":
del self.peers[peer_id]
# Set remote description
await pc.setRemoteDescription(
RTCSessionDescription(sdp=offer_sdp, type="offer")
)
# Create answer
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return pc.localDescription.sdp
34.1.7. Edge vs Cloud Decision
| Factor | Edge | Cloud |
|---|---|---|
| Bandwidth cost | Low | High ($0.01-0.09/GB) |
| GPU availability | Limited (INT8) | Unlimited (FP32) |
| Maximum latency | <100ms | >500ms |
| Model size | Small (<100MB) | Large (multi-GB) |
| Update complexity | Complex (OTA) | Easy (container deploy) |
| Privacy | High (data stays local) | Requires consent |
| Reliability | Works offline | Requires connectivity |
Cascade Pattern
Filter at edge, analyze in cloud:
graph TB
A[Camera 30fps] --> B[Edge: Motion Detect]
B -->|No Motion| C[Drop 95% frames]
B -->|Motion| D[Edge: Person Detect]
D -->|No Person| C
D -->|Person 0.5fps| E[Cloud: Face Recognition]
E --> F[Alert System]
subgraph "Edge Device"
B
D
end
subgraph "Cloud"
E
F
end
Edge Inference Implementation
import onnxruntime as ort
import numpy as np
from typing import List, Tuple
class EdgeInferenceEngine:
"""Optimized inference for edge devices."""
def __init__(
self,
model_path: str,
quantized: bool = True,
num_threads: int = 4
):
# Configure for edge
options = ort.SessionOptions()
options.intra_op_num_threads = num_threads
options.inter_op_num_threads = 1
options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
providers = ["CPUExecutionProvider"]
# Try GPU providers
if ort.get_device() == "GPU":
providers = ["CUDAExecutionProvider"] + providers
self.session = ort.InferenceSession(
model_path,
options,
providers=providers
)
# Get input details
self.input_name = self.session.get_inputs()[0].name
self.input_shape = self.session.get_inputs()[0].shape
def preprocess(self, image: np.ndarray) -> np.ndarray:
"""Preprocess image for model input."""
# Resize
target_size = (self.input_shape[3], self.input_shape[2])
resized = cv2.resize(image, target_size)
# Normalize and transpose
normalized = resized.astype(np.float32) / 255.0
transposed = np.transpose(normalized, (2, 0, 1))
batched = np.expand_dims(transposed, 0)
return batched
def infer(self, preprocessed: np.ndarray) -> np.ndarray:
"""Run inference."""
outputs = self.session.run(None, {self.input_name: preprocessed})
return outputs[0]
def postprocess(
self,
outputs: np.ndarray,
conf_threshold: float = 0.5
) -> List[dict]:
"""Postprocess model outputs to detections."""
detections = []
for detection in outputs[0]:
conf = detection[4]
if conf < conf_threshold:
continue
x1, y1, x2, y2 = detection[:4]
class_probs = detection[5:]
class_id = np.argmax(class_probs)
detections.append({
"bbox": [float(x1), float(y1), float(x2), float(y2)],
"confidence": float(conf),
"class_id": int(class_id)
})
return detections
# Cascade filter
class CascadeFilter:
"""Two-stage cascade: motion → detection."""
def __init__(self, detector_model_path: str):
self.motion_sampler = MotionSampler(motion_threshold=0.02)
self.detector = EdgeInferenceEngine(detector_model_path)
self.person_class_id = 0 # COCO person class
def should_upload(self, frame: np.ndarray, timestamp: float) -> Tuple[bool, dict]:
"""Determine if frame should be sent to cloud."""
# Stage 1: Motion detection (CPU, ~1ms)
has_motion, motion_score = self.motion_sampler.should_process(frame, timestamp)
if not has_motion:
return False, {"reason": "no_motion", "motion_score": motion_score}
# Stage 2: Person detection (GPU/NPU, ~20ms)
preprocessed = self.detector.preprocess(frame)
outputs = self.detector.infer(preprocessed)
detections = self.detector.postprocess(outputs, conf_threshold=0.5)
persons = [d for d in detections if d["class_id"] == self.person_class_id]
if not persons:
return False, {"reason": "no_person", "detections": len(detections)}
return True, {
"reason": "person_detected",
"person_count": len(persons),
"motion_score": motion_score
}
34.1.8. Hardware Comparison
| Device | TOPS | Power | Price | Use Case |
|---|---|---|---|---|
| Coral USB TPU | 4 | 2W | $60 | Counting, classification |
| Coral Dev Board | 4 | 2W | $130 | Standalone edge device |
| Jetson Nano | 40 (FP16) | 10W | $200 | Entry-level detection |
| Jetson Orin Nano | 40 | 15W | $500 | Detection + tracking |
| Jetson Orin NX | 100 | 25W | $900 | Multi-camera pipeline |
| Jetson AGX Orin | 275 | 60W | $2000 | Full pipeline, complex models |
| Intel NUC + Arc | 200 | 100W | $1000 | Server-grade edge |
| Hailo-8 | 26 | 3W | $100 | Low-power inference |
NVIDIA Jetson Deployment
# Dockerfile.jetson
FROM nvcr.io/nvidia/l4t-ml:r35.2.1-py3
WORKDIR /app
# Install dependencies
RUN pip install --no-cache-dir \
opencv-python-headless \
onnxruntime-gpu \
pyyaml \
redis
# Copy model (should be TensorRT optimized)
COPY models/yolov8n.engine /app/models/
# Copy application
COPY src/ /app/src/
ENV MODEL_PATH=/app/models/yolov8n.engine
ENV RTSP_URL=rtsp://camera:554/stream
CMD ["python", "src/main.py"]
TensorRT Optimization
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np
class TensorRTInference:
"""Optimized inference using TensorRT."""
def __init__(self, engine_path: str):
self.logger = trt.Logger(trt.Logger.WARNING)
# Load engine
with open(engine_path, "rb") as f:
self.engine = trt.Runtime(self.logger).deserialize_cuda_engine(f.read())
self.context = self.engine.create_execution_context()
# Allocate buffers
self.inputs = []
self.outputs = []
self.bindings = []
for binding in self.engine:
size = trt.volume(self.engine.get_binding_shape(binding))
dtype = trt.nptype(self.engine.get_binding_dtype(binding))
# Allocate host and device buffers
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
self.bindings.append(int(device_mem))
if self.engine.binding_is_input(binding):
self.inputs.append({"host": host_mem, "device": device_mem})
else:
self.outputs.append({"host": host_mem, "device": device_mem})
def infer(self, input_data: np.ndarray) -> np.ndarray:
"""Run inference."""
# Copy input to device
np.copyto(self.inputs[0]["host"], input_data.ravel())
cuda.memcpy_htod(self.inputs[0]["device"], self.inputs[0]["host"])
# Execute
self.context.execute_v2(self.bindings)
# Copy output to host
cuda.memcpy_dtoh(self.outputs[0]["host"], self.outputs[0]["device"])
return self.outputs[0]["host"]
34.1.9. Monitoring & Observability
import time
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from dataclasses import dataclass
# Metrics
FRAMES_PROCESSED = Counter(
"video_frames_processed_total",
"Total frames processed",
["camera_id", "pipeline"]
)
PROCESSING_LATENCY = Histogram(
"video_processing_latency_seconds",
"Frame processing latency",
["camera_id", "stage"],
buckets=[.01, .025, .05, .1, .25, .5, 1.0]
)
DETECTIONS = Counter(
"video_detections_total",
"Total object detections",
["camera_id", "class_name"]
)
PIPELINE_FPS = Gauge(
"video_pipeline_fps",
"Current pipeline FPS",
["camera_id"]
)
CAMERA_STATUS = Gauge(
"video_camera_status",
"Camera connection status (1=connected, 0=disconnected)",
["camera_id"]
)
class MetricsCollector:
"""Collect and export video pipeline metrics."""
def __init__(self, port: int = 9090):
start_http_server(port)
self.last_frame_time = {}
def record_frame(
self,
camera_id: str,
latencies: dict,
detections: list
) -> None:
"""Record metrics for a processed frame."""
FRAMES_PROCESSED.labels(camera_id=camera_id, pipeline="main").inc()
for stage, latency in latencies.items():
PROCESSING_LATENCY.labels(
camera_id=camera_id,
stage=stage
).observe(latency)
for det in detections:
DETECTIONS.labels(
camera_id=camera_id,
class_name=det["class_name"]
).inc()
# Calculate FPS
now = time.time()
if camera_id in self.last_frame_time:
fps = 1.0 / (now - self.last_frame_time[camera_id])
PIPELINE_FPS.labels(camera_id=camera_id).set(fps)
self.last_frame_time[camera_id] = now
def set_camera_status(self, camera_id: str, connected: bool) -> None:
CAMERA_STATUS.labels(camera_id=camera_id).set(1 if connected else 0)
34.1.10. Troubleshooting Guide
| Problem | Symptoms | Cause | Solution |
|---|---|---|---|
| Frames dropping | Gaps in video | Buffer overflow | Increase buffer, reduce FPS |
| High latency | >2s delay | Buffering too aggressive | Use latency=0, drop=true |
| Color artifacts | Green/pink frames | YUV conversion error | Verify videoconvert in pipeline |
| Memory leak | RAM grows over time | Frame references held | Use max-buffers=1 drop=true |
| Connection lost | Periodic disconnects | Network instability | Add reconnection logic |
| GPU not used | High CPU, slow | Wrong decoder | Check nvdec availability |
| Wrong timestamps | PTS drift | Clock skew | Use camera NTP sync |
Debug Pipeline
# Test GStreamer pipeline
gst-launch-1.0 -v \
rtspsrc location=rtsp://camera:554/stream latency=0 \
! rtph264depay ! h264parse ! avdec_h264 \
! videoconvert ! autovideosink
# Check NVIDIA decoder
gst-inspect-1.0 nvdec
# Monitor frame drops
GST_DEBUG=2 python your_script.py 2>&1 | grep -i drop
34.1.11. Summary Checklist
| Step | Action | Priority |
|---|---|---|
| 1 | Use GStreamer backend for RTSP | Critical |
| 2 | Implement reconnection logic | Critical |
| 3 | Buffer with KVS for cloud analytics | High |
| 4 | Sample frames strategically (motion/I-frame) | High |
| 5 | Use PTS timestamps for sync | High |
| 6 | Consider edge inference for latency | Medium |
| 7 | Convert to TensorRT for GPU edge | Medium |
| 8 | Set up Prometheus metrics | Medium |
| 9 | Test cascade filtering ratios | Medium |
| 10 | Document camera configurations | Low |
[End of Section 34.1]