Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

34.2. Spatial Consistency & Object Tracking

Important

Detection vs. Tracking: YOLO gives you “Car at [x,y]” for a single frame. It has no memory. Tracking gives you “Car #42 has moved from A to B.” Without tracking, you cannot count cars, measure dwell time, or detect loitering.


34.2.1. The Tracking Hierarchy

graph TB
    A[Object Detection] --> B["I see a car"]
    C[Multi-Object Tracking] --> D["I see Car #1 and Car #2 across frames"]
    E[Multi-Camera Tracking] --> F["Car #1 left Cam A, entered Cam B"]
    
    A --> C --> E
LevelCapabilityAlgorithmUse Case
ODSingle-frame detectionYOLO, EfficientDetObject counting
MOTCross-frame trackingDeepSORT, ByteTrackPath analysis
MCTCross-camera trackingReIDCity-wide tracking

34.2.2. Algorithms: SORT and DeepSORT

SORT (Simple Online and Realtime Tracking)

ComponentFunction
Kalman FilterPredict next box position
IoU MatchingAssociate predictions with detections
Track ManagementBirth/death of tracks

Pros: Extremely fast (CPU-only) Cons: Fails on occlusion (ID switches)

DeepSORT

Adds appearance descriptor for robust matching:

import torch
import numpy as np
from deep_sort_realtime.deepsort_tracker import DeepSort

class DeepSORTTracker:
    def __init__(self, max_age: int = 30, n_init: int = 3):
        self.tracker = DeepSort(
            max_age=max_age,
            n_init=n_init,
            embedder="mobilenet",
            embedder_gpu=True
        )
    
    def update(self, detections: list, frame: np.ndarray) -> list:
        """
        Update tracks with new detections.
        
        Args:
            detections: List of [x1, y1, x2, y2, conf, class]
            frame: BGR image for appearance extraction
        
        Returns:
            List of tracks with IDs
        """
        tracks = self.tracker.update_tracks(detections, frame=frame)
        
        results = []
        for track in tracks:
            if not track.is_confirmed():
                continue
            
            track_id = track.track_id
            bbox = track.to_ltrb()  # Left, Top, Right, Bottom
            results.append({
                'id': track_id,
                'bbox': bbox,
                'age': track.age,
                'hits': track.hits
            })
        
        return results

ByteTrack (State of the Art)

ByteTrack uses both high and low confidence detections:

class ByteTrackAdapter:
    """Wrapper for ByteTrack algorithm."""
    
    def __init__(self, track_thresh: float = 0.5, match_thresh: float = 0.8):
        from byte_tracker import BYTETracker
        
        self.tracker = BYTETracker(
            track_thresh=track_thresh,
            match_thresh=match_thresh,
            track_buffer=30,
            frame_rate=30
        )
    
    def update(self, detections: np.ndarray) -> list:
        """
        Args:
            detections: [x1, y1, x2, y2, score] per detection
        """
        online_targets = self.tracker.update(detections)
        
        return [
            {'id': t.track_id, 'bbox': t.tlbr, 'score': t.score}
            for t in online_targets
        ]

34.2.3. The Kalman Filter

State vector for 2D bounding box: $[u, v, s, r, \dot{u}, \dot{v}, \dot{s}]$

VariableMeaning
u, vCenter position
sScale (area)
rAspect ratio
$\dot{u}, \dot{v}, \dot{s}$Velocities

Implementation

from filterpy.kalman import KalmanFilter
import numpy as np

class BoxKalmanFilter:
    """Kalman filter for bounding box tracking."""
    
    def __init__(self):
        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        
        # State transition (constant velocity model)
        self.kf.F = np.array([
            [1, 0, 0, 0, 1, 0, 0],
            [0, 1, 0, 0, 0, 1, 0],
            [0, 0, 1, 0, 0, 0, 1],
            [0, 0, 0, 1, 0, 0, 0],
            [0, 0, 0, 0, 1, 0, 0],
            [0, 0, 0, 0, 0, 1, 0],
            [0, 0, 0, 0, 0, 0, 1]
        ])
        
        # Measurement matrix (we observe x, y, s, r)
        self.kf.H = np.array([
            [1, 0, 0, 0, 0, 0, 0],
            [0, 1, 0, 0, 0, 0, 0],
            [0, 0, 1, 0, 0, 0, 0],
            [0, 0, 0, 1, 0, 0, 0]
        ])
        
        # Measurement noise
        self.kf.R *= 10
        
        # Process noise
        self.kf.Q[-1, -1] *= 0.01
        self.kf.Q[4:, 4:] *= 0.01
    
    def predict(self) -> np.ndarray:
        """Predict next state."""
        self.kf.predict()
        return self.kf.x[:4].flatten()
    
    def update(self, measurement: np.ndarray):
        """Update with observation."""
        self.kf.update(measurement)

34.2.4. Data Association: Hungarian Algorithm

from scipy.optimize import linear_sum_assignment
import numpy as np

def compute_iou(box1: np.ndarray, box2: np.ndarray) -> float:
    """Compute IoU between two boxes [x1, y1, x2, y2]."""
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])
    
    inter = max(0, x2 - x1) * max(0, y2 - y1)
    area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
    
    return inter / (area1 + area2 - inter + 1e-6)

def associate_detections(
    trackers: list,
    detections: list,
    iou_threshold: float = 0.3
) -> tuple:
    """
    Associate detections to existing trackers using Hungarian algorithm.
    
    Returns:
        matches: List of (tracker_idx, detection_idx)
        unmatched_trackers: List of tracker indices
        unmatched_detections: List of detection indices
    """
    if len(trackers) == 0:
        return [], [], list(range(len(detections)))
    
    if len(detections) == 0:
        return [], list(range(len(trackers))), []
    
    # Build cost matrix (1 - IoU)
    iou_matrix = np.zeros((len(trackers), len(detections)))
    for t, trk in enumerate(trackers):
        for d, det in enumerate(detections):
            iou_matrix[t, d] = compute_iou(trk, det)
    
    # Hungarian algorithm (scipy minimizes, so use negative)
    row_ind, col_ind = linear_sum_assignment(-iou_matrix)
    
    matches = []
    for r, c in zip(row_ind, col_ind):
        if iou_matrix[r, c] >= iou_threshold:
            matches.append((r, c))
    
    unmatched_trackers = [t for t in range(len(trackers)) if t not in [m[0] for m in matches]]
    unmatched_detections = [d for d in range(len(detections)) if d not in [m[1] for m in matches]]
    
    return matches, unmatched_trackers, unmatched_detections

34.2.5. Spatial Databases: PostGIS

-- Schema for spatial tracking
CREATE TABLE object_tracks (
    track_id UUID PRIMARY KEY,
    object_class VARCHAR(50),
    created_at TIMESTAMP,
    last_seen TIMESTAMP,
    trajectory GEOMETRY(LINESTRING, 4326)
);

CREATE TABLE track_points (
    id SERIAL PRIMARY KEY,
    track_id UUID REFERENCES object_tracks(track_id),
    timestamp TIMESTAMP,
    location GEOMETRY(POINT, 4326),
    confidence FLOAT,
    bbox JSONB
);

-- Spatial index for fast queries
CREATE INDEX idx_track_points_location 
ON track_points USING GIST(location);

-- Query: Objects in polygon
SELECT DISTINCT track_id 
FROM track_points 
WHERE ST_Within(location, ST_GeomFromGeoJSON(?));

-- Query: Objects that crossed a line
SELECT track_id 
FROM object_tracks 
WHERE ST_Crosses(trajectory, ST_MakeLine(
    ST_Point(-122.4, 37.7),
    ST_Point(-122.3, 37.8)
));

34.2.6. Geofencing and Loitering Detection

from datetime import datetime, timedelta
from dataclasses import dataclass
from shapely.geometry import Point, Polygon
from typing import Dict, List

@dataclass
class GeofenceEvent:
    track_id: str
    event_type: str  # 'enter', 'exit', 'loiter'
    timestamp: datetime
    duration: float = 0.0

class GeofenceMonitor:
    """Monitor objects entering/exiting/loitering in zones."""
    
    def __init__(self, zones: Dict[str, Polygon], loiter_threshold: float = 300):
        self.zones = zones
        self.loiter_threshold = loiter_threshold  # seconds
        self.track_states: Dict[str, Dict] = {}
    
    def update(self, track_id: str, x: float, y: float, timestamp: datetime) -> List[GeofenceEvent]:
        """Update track position and check for events."""
        events = []
        point = Point(x, y)
        
        if track_id not in self.track_states:
            self.track_states[track_id] = {}
        
        for zone_name, polygon in self.zones.items():
            inside = polygon.contains(point)
            state = self.track_states[track_id].get(zone_name, {
                'inside': False,
                'enter_time': None,
                'consecutive_outside': 0
            })
            
            if inside and not state['inside']:
                # Enter event
                state['inside'] = True
                state['enter_time'] = timestamp
                state['consecutive_outside'] = 0
                events.append(GeofenceEvent(
                    track_id=track_id,
                    event_type='enter',
                    timestamp=timestamp
                ))
            
            elif not inside and state['inside']:
                # Potential exit (use hysteresis)
                state['consecutive_outside'] += 1
                if state['consecutive_outside'] >= 3:
                    duration = (timestamp - state['enter_time']).total_seconds()
                    state['inside'] = False
                    events.append(GeofenceEvent(
                        track_id=track_id,
                        event_type='exit',
                        timestamp=timestamp,
                        duration=duration
                    ))
            
            elif inside and state['inside']:
                # Check for loitering
                state['consecutive_outside'] = 0
                duration = (timestamp - state['enter_time']).total_seconds()
                if duration >= self.loiter_threshold:
                    events.append(GeofenceEvent(
                        track_id=track_id,
                        event_type='loiter',
                        timestamp=timestamp,
                        duration=duration
                    ))
            
            self.track_states[track_id][zone_name] = state
        
        return events

34.2.7. Multi-Camera Tracking (Re-Identification)

graph LR
    A[Camera A] -->|Crop| B[ResNet Encoder]
    B -->|Vector| C[(Vector DB)]
    
    D[Camera B] -->|Crop| E[ResNet Encoder]
    E -->|Query| C
    C -->|Match: Car #42| F{Merge IDs}

Implementation

import torch
from torchvision import models, transforms
import faiss

class ReIDMatcher:
    """Re-identification across cameras using appearance embeddings."""
    
    def __init__(self, embedding_dim: int = 2048):
        self.encoder = models.resnet50(pretrained=True)
        self.encoder.fc = torch.nn.Identity()  # Remove classifier
        self.encoder.eval()
        
        self.transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((256, 128)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
        
        # FAISS index for fast similarity search
        self.index = faiss.IndexFlatIP(embedding_dim)
        self.id_map = []
    
    def extract_embedding(self, crop: np.ndarray) -> np.ndarray:
        """Extract appearance embedding from object crop."""
        with torch.no_grad():
            x = self.transform(crop).unsqueeze(0)
            embedding = self.encoder(x)
            embedding = embedding.numpy().flatten()
            # L2 normalize for cosine similarity
            embedding = embedding / np.linalg.norm(embedding)
            return embedding
    
    def register(self, track_id: str, crop: np.ndarray):
        """Register a new track with its appearance."""
        embedding = self.extract_embedding(crop)
        self.index.add(embedding.reshape(1, -1))
        self.id_map.append(track_id)
    
    def match(self, crop: np.ndarray, threshold: float = 0.85) -> str:
        """Find matching track ID or return None."""
        embedding = self.extract_embedding(crop)
        
        D, I = self.index.search(embedding.reshape(1, -1), k=1)
        
        if D[0, 0] >= threshold:
            return self.id_map[I[0, 0]]
        return None

34.2.8. Camera Calibration and Homography

import cv2
import numpy as np

class HomographyTransform:
    """Transform between pixel and world coordinates."""
    
    def __init__(self, pixel_points: np.ndarray, world_points: np.ndarray):
        """
        Args:
            pixel_points: 4+ points in image [u, v]
            world_points: Corresponding world coords [x, y]
        """
        self.H, _ = cv2.findHomography(pixel_points, world_points)
        self.H_inv, _ = cv2.findHomography(world_points, pixel_points)
    
    def pixel_to_world(self, u: float, v: float) -> tuple:
        """Convert pixel to world coordinates."""
        point = np.array([[[u, v]]], dtype='float32')
        transformed = cv2.perspectiveTransform(point, self.H)
        return float(transformed[0, 0, 0]), float(transformed[0, 0, 1])
    
    def world_to_pixel(self, x: float, y: float) -> tuple:
        """Convert world to pixel coordinates."""
        point = np.array([[[x, y]]], dtype='float32')
        transformed = cv2.perspectiveTransform(point, self.H_inv)
        return int(transformed[0, 0, 0]), int(transformed[0, 0, 1])
    
    def compute_speed(self, track_history: list, fps: float) -> float:
        """Compute real-world speed from track history."""
        if len(track_history) < 2:
            return 0.0
        
        # Convert to world coordinates
        world_points = [self.pixel_to_world(p[0], p[1]) for p in track_history]
        
        # Compute distance
        total_dist = 0
        for i in range(1, len(world_points)):
            dx = world_points[i][0] - world_points[i-1][0]
            dy = world_points[i][1] - world_points[i-1][1]
            total_dist += np.sqrt(dx**2 + dy**2)
        
        # Speed = distance / time
        time = len(track_history) / fps
        return total_dist / time if time > 0 else 0.0

34.2.9. Metrics: MOTA and IDF1

import motmetrics as mm

class TrackingEvaluator:
    """Evaluate MOT performance."""
    
    def __init__(self):
        self.acc = mm.MOTAccumulator(auto_id=True)
    
    def update_frame(
        self,
        gt_ids: list,
        gt_boxes: list,
        pred_ids: list,
        pred_boxes: list
    ):
        """Add frame results."""
        distances = mm.distances.iou_matrix(
            gt_boxes, pred_boxes, max_iou=0.5
        )
        self.acc.update(gt_ids, pred_ids, distances)
    
    def compute_metrics(self) -> dict:
        """Compute final metrics."""
        mh = mm.metrics.create()
        summary = mh.compute(
            self.acc,
            metrics=['mota', 'motp', 'idf1', 'num_switches', 'mostly_tracked', 'mostly_lost']
        )
        return summary.to_dict('records')[0]

34.2.10. Summary Checklist

StepActionTool
1Detect objectsYOLO, EfficientDet
2Track across framesByteTrack, DeepSORT
3Store trajectoriesPostGIS
4Detect geofence eventsShapely
5Match across camerasReID + FAISS
6Evaluate performancepy-motmetrics

[End of Section 34.2]