34.2. Spatial Consistency & Object Tracking
Important
Detection vs. Tracking: YOLO gives you “Car at [x,y]” for a single frame. It has no memory. Tracking gives you “Car #42 has moved from A to B.” Without tracking, you cannot count cars, measure dwell time, or detect loitering.
34.2.1. The Tracking Hierarchy
graph TB
A[Object Detection] --> B["I see a car"]
C[Multi-Object Tracking] --> D["I see Car #1 and Car #2 across frames"]
E[Multi-Camera Tracking] --> F["Car #1 left Cam A, entered Cam B"]
A --> C --> E
| Level | Capability | Algorithm | Use Case |
|---|---|---|---|
| OD | Single-frame detection | YOLO, EfficientDet | Object counting |
| MOT | Cross-frame tracking | DeepSORT, ByteTrack | Path analysis |
| MCT | Cross-camera tracking | ReID | City-wide tracking |
34.2.2. Algorithms: SORT and DeepSORT
SORT (Simple Online and Realtime Tracking)
| Component | Function |
|---|---|
| Kalman Filter | Predict next box position |
| IoU Matching | Associate predictions with detections |
| Track Management | Birth/death of tracks |
Pros: Extremely fast (CPU-only) Cons: Fails on occlusion (ID switches)
DeepSORT
Adds appearance descriptor for robust matching:
import torch
import numpy as np
from deep_sort_realtime.deepsort_tracker import DeepSort
class DeepSORTTracker:
def __init__(self, max_age: int = 30, n_init: int = 3):
self.tracker = DeepSort(
max_age=max_age,
n_init=n_init,
embedder="mobilenet",
embedder_gpu=True
)
def update(self, detections: list, frame: np.ndarray) -> list:
"""
Update tracks with new detections.
Args:
detections: List of [x1, y1, x2, y2, conf, class]
frame: BGR image for appearance extraction
Returns:
List of tracks with IDs
"""
tracks = self.tracker.update_tracks(detections, frame=frame)
results = []
for track in tracks:
if not track.is_confirmed():
continue
track_id = track.track_id
bbox = track.to_ltrb() # Left, Top, Right, Bottom
results.append({
'id': track_id,
'bbox': bbox,
'age': track.age,
'hits': track.hits
})
return results
ByteTrack (State of the Art)
ByteTrack uses both high and low confidence detections:
class ByteTrackAdapter:
"""Wrapper for ByteTrack algorithm."""
def __init__(self, track_thresh: float = 0.5, match_thresh: float = 0.8):
from byte_tracker import BYTETracker
self.tracker = BYTETracker(
track_thresh=track_thresh,
match_thresh=match_thresh,
track_buffer=30,
frame_rate=30
)
def update(self, detections: np.ndarray) -> list:
"""
Args:
detections: [x1, y1, x2, y2, score] per detection
"""
online_targets = self.tracker.update(detections)
return [
{'id': t.track_id, 'bbox': t.tlbr, 'score': t.score}
for t in online_targets
]
34.2.3. The Kalman Filter
State vector for 2D bounding box: $[u, v, s, r, \dot{u}, \dot{v}, \dot{s}]$
| Variable | Meaning |
|---|---|
| u, v | Center position |
| s | Scale (area) |
| r | Aspect ratio |
| $\dot{u}, \dot{v}, \dot{s}$ | Velocities |
Implementation
from filterpy.kalman import KalmanFilter
import numpy as np
class BoxKalmanFilter:
"""Kalman filter for bounding box tracking."""
def __init__(self):
self.kf = KalmanFilter(dim_x=7, dim_z=4)
# State transition (constant velocity model)
self.kf.F = np.array([
[1, 0, 0, 0, 1, 0, 0],
[0, 1, 0, 0, 0, 1, 0],
[0, 0, 1, 0, 0, 0, 1],
[0, 0, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 0, 1]
])
# Measurement matrix (we observe x, y, s, r)
self.kf.H = np.array([
[1, 0, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0, 0]
])
# Measurement noise
self.kf.R *= 10
# Process noise
self.kf.Q[-1, -1] *= 0.01
self.kf.Q[4:, 4:] *= 0.01
def predict(self) -> np.ndarray:
"""Predict next state."""
self.kf.predict()
return self.kf.x[:4].flatten()
def update(self, measurement: np.ndarray):
"""Update with observation."""
self.kf.update(measurement)
34.2.4. Data Association: Hungarian Algorithm
from scipy.optimize import linear_sum_assignment
import numpy as np
def compute_iou(box1: np.ndarray, box2: np.ndarray) -> float:
"""Compute IoU between two boxes [x1, y1, x2, y2]."""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
inter = max(0, x2 - x1) * max(0, y2 - y1)
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
return inter / (area1 + area2 - inter + 1e-6)
def associate_detections(
trackers: list,
detections: list,
iou_threshold: float = 0.3
) -> tuple:
"""
Associate detections to existing trackers using Hungarian algorithm.
Returns:
matches: List of (tracker_idx, detection_idx)
unmatched_trackers: List of tracker indices
unmatched_detections: List of detection indices
"""
if len(trackers) == 0:
return [], [], list(range(len(detections)))
if len(detections) == 0:
return [], list(range(len(trackers))), []
# Build cost matrix (1 - IoU)
iou_matrix = np.zeros((len(trackers), len(detections)))
for t, trk in enumerate(trackers):
for d, det in enumerate(detections):
iou_matrix[t, d] = compute_iou(trk, det)
# Hungarian algorithm (scipy minimizes, so use negative)
row_ind, col_ind = linear_sum_assignment(-iou_matrix)
matches = []
for r, c in zip(row_ind, col_ind):
if iou_matrix[r, c] >= iou_threshold:
matches.append((r, c))
unmatched_trackers = [t for t in range(len(trackers)) if t not in [m[0] for m in matches]]
unmatched_detections = [d for d in range(len(detections)) if d not in [m[1] for m in matches]]
return matches, unmatched_trackers, unmatched_detections
34.2.5. Spatial Databases: PostGIS
-- Schema for spatial tracking
CREATE TABLE object_tracks (
track_id UUID PRIMARY KEY,
object_class VARCHAR(50),
created_at TIMESTAMP,
last_seen TIMESTAMP,
trajectory GEOMETRY(LINESTRING, 4326)
);
CREATE TABLE track_points (
id SERIAL PRIMARY KEY,
track_id UUID REFERENCES object_tracks(track_id),
timestamp TIMESTAMP,
location GEOMETRY(POINT, 4326),
confidence FLOAT,
bbox JSONB
);
-- Spatial index for fast queries
CREATE INDEX idx_track_points_location
ON track_points USING GIST(location);
-- Query: Objects in polygon
SELECT DISTINCT track_id
FROM track_points
WHERE ST_Within(location, ST_GeomFromGeoJSON(?));
-- Query: Objects that crossed a line
SELECT track_id
FROM object_tracks
WHERE ST_Crosses(trajectory, ST_MakeLine(
ST_Point(-122.4, 37.7),
ST_Point(-122.3, 37.8)
));
34.2.6. Geofencing and Loitering Detection
from datetime import datetime, timedelta
from dataclasses import dataclass
from shapely.geometry import Point, Polygon
from typing import Dict, List
@dataclass
class GeofenceEvent:
track_id: str
event_type: str # 'enter', 'exit', 'loiter'
timestamp: datetime
duration: float = 0.0
class GeofenceMonitor:
"""Monitor objects entering/exiting/loitering in zones."""
def __init__(self, zones: Dict[str, Polygon], loiter_threshold: float = 300):
self.zones = zones
self.loiter_threshold = loiter_threshold # seconds
self.track_states: Dict[str, Dict] = {}
def update(self, track_id: str, x: float, y: float, timestamp: datetime) -> List[GeofenceEvent]:
"""Update track position and check for events."""
events = []
point = Point(x, y)
if track_id not in self.track_states:
self.track_states[track_id] = {}
for zone_name, polygon in self.zones.items():
inside = polygon.contains(point)
state = self.track_states[track_id].get(zone_name, {
'inside': False,
'enter_time': None,
'consecutive_outside': 0
})
if inside and not state['inside']:
# Enter event
state['inside'] = True
state['enter_time'] = timestamp
state['consecutive_outside'] = 0
events.append(GeofenceEvent(
track_id=track_id,
event_type='enter',
timestamp=timestamp
))
elif not inside and state['inside']:
# Potential exit (use hysteresis)
state['consecutive_outside'] += 1
if state['consecutive_outside'] >= 3:
duration = (timestamp - state['enter_time']).total_seconds()
state['inside'] = False
events.append(GeofenceEvent(
track_id=track_id,
event_type='exit',
timestamp=timestamp,
duration=duration
))
elif inside and state['inside']:
# Check for loitering
state['consecutive_outside'] = 0
duration = (timestamp - state['enter_time']).total_seconds()
if duration >= self.loiter_threshold:
events.append(GeofenceEvent(
track_id=track_id,
event_type='loiter',
timestamp=timestamp,
duration=duration
))
self.track_states[track_id][zone_name] = state
return events
34.2.7. Multi-Camera Tracking (Re-Identification)
graph LR
A[Camera A] -->|Crop| B[ResNet Encoder]
B -->|Vector| C[(Vector DB)]
D[Camera B] -->|Crop| E[ResNet Encoder]
E -->|Query| C
C -->|Match: Car #42| F{Merge IDs}
Implementation
import torch
from torchvision import models, transforms
import faiss
class ReIDMatcher:
"""Re-identification across cameras using appearance embeddings."""
def __init__(self, embedding_dim: int = 2048):
self.encoder = models.resnet50(pretrained=True)
self.encoder.fc = torch.nn.Identity() # Remove classifier
self.encoder.eval()
self.transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((256, 128)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# FAISS index for fast similarity search
self.index = faiss.IndexFlatIP(embedding_dim)
self.id_map = []
def extract_embedding(self, crop: np.ndarray) -> np.ndarray:
"""Extract appearance embedding from object crop."""
with torch.no_grad():
x = self.transform(crop).unsqueeze(0)
embedding = self.encoder(x)
embedding = embedding.numpy().flatten()
# L2 normalize for cosine similarity
embedding = embedding / np.linalg.norm(embedding)
return embedding
def register(self, track_id: str, crop: np.ndarray):
"""Register a new track with its appearance."""
embedding = self.extract_embedding(crop)
self.index.add(embedding.reshape(1, -1))
self.id_map.append(track_id)
def match(self, crop: np.ndarray, threshold: float = 0.85) -> str:
"""Find matching track ID or return None."""
embedding = self.extract_embedding(crop)
D, I = self.index.search(embedding.reshape(1, -1), k=1)
if D[0, 0] >= threshold:
return self.id_map[I[0, 0]]
return None
34.2.8. Camera Calibration and Homography
import cv2
import numpy as np
class HomographyTransform:
"""Transform between pixel and world coordinates."""
def __init__(self, pixel_points: np.ndarray, world_points: np.ndarray):
"""
Args:
pixel_points: 4+ points in image [u, v]
world_points: Corresponding world coords [x, y]
"""
self.H, _ = cv2.findHomography(pixel_points, world_points)
self.H_inv, _ = cv2.findHomography(world_points, pixel_points)
def pixel_to_world(self, u: float, v: float) -> tuple:
"""Convert pixel to world coordinates."""
point = np.array([[[u, v]]], dtype='float32')
transformed = cv2.perspectiveTransform(point, self.H)
return float(transformed[0, 0, 0]), float(transformed[0, 0, 1])
def world_to_pixel(self, x: float, y: float) -> tuple:
"""Convert world to pixel coordinates."""
point = np.array([[[x, y]]], dtype='float32')
transformed = cv2.perspectiveTransform(point, self.H_inv)
return int(transformed[0, 0, 0]), int(transformed[0, 0, 1])
def compute_speed(self, track_history: list, fps: float) -> float:
"""Compute real-world speed from track history."""
if len(track_history) < 2:
return 0.0
# Convert to world coordinates
world_points = [self.pixel_to_world(p[0], p[1]) for p in track_history]
# Compute distance
total_dist = 0
for i in range(1, len(world_points)):
dx = world_points[i][0] - world_points[i-1][0]
dy = world_points[i][1] - world_points[i-1][1]
total_dist += np.sqrt(dx**2 + dy**2)
# Speed = distance / time
time = len(track_history) / fps
return total_dist / time if time > 0 else 0.0
34.2.9. Metrics: MOTA and IDF1
import motmetrics as mm
class TrackingEvaluator:
"""Evaluate MOT performance."""
def __init__(self):
self.acc = mm.MOTAccumulator(auto_id=True)
def update_frame(
self,
gt_ids: list,
gt_boxes: list,
pred_ids: list,
pred_boxes: list
):
"""Add frame results."""
distances = mm.distances.iou_matrix(
gt_boxes, pred_boxes, max_iou=0.5
)
self.acc.update(gt_ids, pred_ids, distances)
def compute_metrics(self) -> dict:
"""Compute final metrics."""
mh = mm.metrics.create()
summary = mh.compute(
self.acc,
metrics=['mota', 'motp', 'idf1', 'num_switches', 'mostly_tracked', 'mostly_lost']
)
return summary.to_dict('records')[0]
34.2.10. Summary Checklist
| Step | Action | Tool |
|---|---|---|
| 1 | Detect objects | YOLO, EfficientDet |
| 2 | Track across frames | ByteTrack, DeepSORT |
| 3 | Store trajectories | PostGIS |
| 4 | Detect geofence events | Shapely |
| 5 | Match across cameras | ReID + FAISS |
| 6 | Evaluate performance | py-motmetrics |
[End of Section 34.2]