发布网站建设信息,北海建设厅官方网站,河南工程建筑信息网,怎么制作游戏脚本本文首先将介绍在目标跟踪任务中常用的匈牙利算法#xff08;Hungarian Algorithm#xff09;和卡尔曼滤波#xff08;Kalman Filter#xff09;#xff0c;然后介绍经典算法DeepSORT的工作流程以及对相关源码进行解析。 目前主流的目标跟踪算法都是基于Tracking-by-Detec…本文首先将介绍在目标跟踪任务中常用的匈牙利算法Hungarian Algorithm和卡尔曼滤波Kalman Filter然后介绍经典算法DeepSORT的工作流程以及对相关源码进行解析。 目前主流的目标跟踪算法都是基于Tracking-by-Detecton策略即基于目标检测的结果来进行目标跟踪。DeepSORT运用的就是这个策略上面的视频是DeepSORT对人群进行跟踪的结果每个bbox左上角的数字是用来标识某个人的唯一ID号。
这里就有个问题视频中不同时刻的同一个人位置发生了变化那么是如何关联上的呢答案就是匈牙利算法和卡尔曼滤波。
匈牙利算法可以告诉我们当前帧的某个目标是否与前一帧的某个目标相同。卡尔曼滤波可以基于目标前一时刻的位置来预测当前时刻的位置并且可以比传感器在目标跟踪中即目标检测器比如Yolo等更准确的估计目标的位置。
匈牙利算法Hungarian Algorithm
首先先介绍一下什么是分配问题Assignment Problem假设有N个人和N个任务每个任务可以任意分配给不同的人已知每个人完成每个任务要花费的代价不尽相同那么如何分配可以使得总的代价最小。
举个例子假设现在有3个任务要分别分配给3个人每个人完成各个任务所需代价矩阵cost matrix。 怎样才能找到一个最优分配使得完成所有任务花费的代价最小呢
匈牙利算法又叫KM算法就是用来解决分配问题的一种方法它基于定理
如果代价矩阵的某一行或某一列同时加上或减去某个数则这个新的代价矩阵的最优分配仍然是原代价矩阵的最优分配。 算法步骤假设矩阵为NxN方阵
对于矩阵的每一行减去其中最小的元素对于矩阵的每一列减去其中最小的元素用最少的水平线或垂直线覆盖矩阵中所有的0如果线的数量等于N则找到了最优分配算法结束否则进入步骤5找到没有被任何线覆盖的最小元素每个没被线覆盖的行减去这个元素每个被线覆盖的列加上这个元素返回步骤3
继续拿上面的例子做演示
step1 每一行最小的元素分别为15、20、20减去得到 step2 每一列最小的元素分别为0、20、5减去得到 step3 用最少的水平线或垂直线覆盖所有的0得到 step4 线的数量为2小于3进入下一步
step5 现在没被覆盖的最小元素是5没被覆盖的行第一和第二行减去5得到 被覆盖的列第一列加上5得到 跳转到step3用最少的水平线或垂直线覆盖所有的0得到 step4线的数量为3满足条件算法结束。显然将任务2分配给第1个人、任务1分配给第2个人、任务3分配给第3个人时总的代价最小0000 所以原矩阵的最小总代价为40202585 sklearn里的linear_assignment()函数以及scipy里的linear_sum_assignment()函数都实现了匈牙利算法两者的返回值的形式不同 import numpy as np
from sklearn.utils.linear_assignment_ import linear_assignment
from scipy.optimize import linear_sum_assignmentcost_matrix np.array([[15,40,45],[20,60,35],[20,40,25]
])matches linear_assignment(cost_matrix)
print(sklearn API result:\n, matches)
matches linear_sum_assignment(cost_matrix)
print(scipy API result:\n, matches)Outputs
sklearn API result:[[0 1][1 0][2 2]]
scipy API result:(array([0, 1, 2], dtypeint64), array([1, 0, 2], dtypeint64))在DeepSORT中匈牙利算法用来将前一帧中的跟踪框tracks与当前帧中的检测框detections进行关联通过外观信息appearance information和马氏距离Mahalanobis distance或者IOU来计算代价矩阵。
源码解读
# linear_assignment.py
def min_cost_matching(distance_metric, max_distance, tracks, detections, track_indicesNone, detection_indicesNone):...# 计算代价矩阵cost_matrix distance_metric(tracks, detections, track_indices, detection_indices)cost_matrix[cost_matrix max_distance] max_distance 1e-5# 执行匈牙利算法得到匹配成功的索引对行索引为tracks的索引列索引为detections的索引row_indices, col_indices linear_assignment(cost_matrix)matches, unmatched_tracks, unmatched_detections [], [], []# 找出未匹配的detectionsfor col, detection_idx in enumerate(detection_indices):if col not in col_indices:unmatched_detections.append(detection_idx)# 找出未匹配的tracksfor row, track_idx in enumerate(track_indices):if row not in row_indices:unmatched_tracks.append(track_idx)# 遍历匹配的(track, detection)索引对for row, col in zip(row_indices, col_indices):track_idx track_indices[row]detection_idx detection_indices[col]# 如果相应的cost大于阈值max_distance也视为未匹配成功if cost_matrix[row, col] max_distance:unmatched_tracks.append(track_idx)unmatched_detections.append(detection_idx)else:matches.append((track_idx, detection_idx))return matches, unmatched_tracks, unmatched_detections
卡尔曼滤波Kalman Filter
卡尔曼滤波被广泛应用于无人机、自动驾驶、卫星导航等领域简单来说其作用就是基于传感器的测量值来更新预测值以达到更精确的估计。
假设我们要跟踪小车的位置变化如下图所示蓝色的分布是卡尔曼滤波预测值棕色的分布是传感器的测量值灰色的分布就是预测值基于测量值更新后的最优估计。 在目标跟踪中需要估计track的以下两个状态
均值(Mean)表示目标的位置信息由bbox的中心坐标 (cx, cy)宽高比r高h以及各自的速度变化值组成由8维向量表示为 x [cx, cy, r, h, vx, vy, vr, vh]各个速度值初始化为0。协方差(Covariance )表示目标位置信息的不确定性由8x8的对角矩阵表示矩阵中数字越大则表明不确定性越大可以以任意值初始化。
卡尔曼滤波分为两个阶段(1) 预测track在下一时刻的位置(2) 基于detection来更新预测的位置。
下面将介绍这两个阶段用到的计算公式。这里不涉及公式的原理推导因为我也不清楚原理(ಥ_ಥ) 只是说明一下各个公式的作用
预测
基于track在 t-1时刻的状态来预测其在 t时刻的状态。 在公式1中x为track在t-1时刻的均值F称为状态转移矩阵该公式预测t时刻的x 矩阵F中的dt是当前帧和前一帧之间的差将等号右边的矩阵乘法展开可以得到cxcxdt*vxcycydt*vy...所以这里的卡尔曼滤波是一个匀速模型Constant Velocity Model。
在公式2中P为track在t-1时刻的协方差Q为系统的噪声矩阵代表整个系统的可靠程度一般初始化为很小的值该公式预测t时刻的P。
源码解读
# kalman_filter.py
def predict(self, mean, covariance):Run Kalman filter prediction step.Parameters----------mean: ndarray, the 8 dimensional mean vector of the object state at the previous time step.covariance: ndarray, the 8x8 dimensional covariance matrix of the object state at the previous time step.Returns-------(ndarray, ndarray), the mean vector and covariance matrix of the predicted state. Unobserved velocities are initialized to 0 mean.std_pos [self._std_weight_position * mean[3],self._std_weight_position * mean[3],1e-2,self._std_weight_position * mean[3]]std_vel [self._std_weight_velocity * mean[3],self._std_weight_velocity * mean[3],1e-5,self._std_weight_velocity * mean[3]]motion_cov np.diag(np.square(np.r_[std_pos, std_vel])) # 初始化噪声矩阵Qmean np.dot(self._motion_mat, mean) # x Fxcovariance np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) motion_cov # P FPF(T) Qreturn mean, covariance
更新
基于 t时刻检测到的detection校正与其关联的track的状态得到一个更精确的结果。 在公式3中z为detection的均值向量不包含速度变化值即z[cx, cy, r, h]H称为测量矩阵它将track的均值向量x映射到检测空间该公式计算detection和track的均值误差
在公式4中R为检测器的噪声矩阵它是一个4x4的对角矩阵对角线上的值分别为中心点两个坐标以及宽高的噪声以任意值初始化一般设置宽高的噪声大于中心点的噪声该公式先将协方差矩阵P映射到检测空间然后再加上噪声矩阵R
公式5计算卡尔曼增益K卡尔曼增益用于估计误差的重要程度
公式6和公式7得到更新后的均值向量x和协方差矩阵P。
源码解读
# kalman_filter.py
def project(self, mean, covariance):Project state distribution to measurement space.Parameters----------mean: ndarray, the states mean vector (8 dimensional array).covariance: ndarray, the states covariance matrix (8x8 dimensional).Returns-------(ndarray, ndarray), the projected mean and covariance matrix of the given state estimate.std [self._std_weight_position * mean[3],self._std_weight_position * mean[3],1e-1,self._std_weight_position * mean[3]]innovation_cov np.diag(np.square(std)) # 初始化噪声矩阵Rmean np.dot(self._update_mat, mean) # 将均值向量映射到检测空间即Hxcovariance np.linalg.multi_dot((self._update_mat, covariance, self._update_mat.T)) # 将协方差矩阵映射到检测空间即HPH^Treturn mean, covariance innovation_covdef update(self, mean, covariance, measurement):Run Kalman filter correction step.Parameters----------mean: ndarra, the predicted states mean vector (8 dimensional).covariance: ndarray, the states covariance matrix (8x8 dimensional).measurement: ndarray, the 4 dimensional measurement vector (x, y, a, h), where (x, y) is the center position, a the aspect ratio, and h the height of the bounding box.Returns-------(ndarray, ndarray), the measurement-corrected state distribution.# 将mean和covariance映射到检测空间得到Hx和Sprojected_mean, projected_cov self.project(mean, covariance)# 矩阵分解这一步没看懂chol_factor, lower scipy.linalg.cho_factor(projected_cov, lowerTrue, check_finiteFalse)# 计算卡尔曼增益K这一步没看明白是如何对应上公式5的求线代大佬指教kalman_gain scipy.linalg.cho_solve((chol_factor, lower), np.dot(covariance, self._update_mat.T).T,check_finiteFalse).T# z - Hxinnovation measurement - projected_mean# x x Kynew_mean mean np.dot(innovation, kalman_gain.T)# P (I - KH)Pnew_covariance covariance - np.linalg.multi_dot((kalman_gain, projected_cov, kalman_gain.T))return new_mean, new_covariance
DeepSort工作流程
DeepSORT对每一帧的处理流程如下
检测器得到bbox → 生成detections → 卡尔曼滤波预测→ 使用匈牙利算法将预测后的tracks和当前帧中的detecions进行匹配级联匹配和IOU匹配 → 卡尔曼滤波更新 Frame 0检测器检测到了3个detections当前没有任何tracks将这3个detections初始化为tracks Frame 1检测器又检测到了3个detections对于Frame 0中的tracks先进行预测得到新的tracks然后使用匈牙利算法将新的tracks与detections进行匹配得到(track, detection)匹配对最后用每对中的detection更新对应的track 检测
使用Yolo作为检测器检测当前帧中的bbox
# demo_yolo3_deepsort.py
def detect(self):while self.vdo.grab():...bbox_xcycwh, cls_conf, cls_ids self.yolo3(im) # 检测到的bbox[cx,cy,w,h]置信度类别idif bbox_xcycwh is not None:# 筛选出人的类别mask cls_ids 0bbox_xcycwh bbox_xcycwh[mask]bbox_xcycwh[:, 3:] * 1.2cls_conf cls_conf[mask]...
生成detections
将检测到的bbox转换成detections
# deep_sort.py
def update(self, bbox_xywh, confidences, ori_img):self.height, self.width ori_img.shape[:2]# 提取每个bbox的featurefeatures self._get_features(bbox_xywh, ori_img)# [cx,cy,w,h] - [x1,y1,w,h]bbox_tlwh self._xywh_to_tlwh(bbox_xywh)# 过滤掉置信度小于self.min_confidence的bbox生成detectionsdetections [Detection(bbox_tlwh[i], conf, features[i]) for i,conf in enumerate(confidences) if conf self.min_confidence]# NMS (这里self.nms_max_overlap的值为1即保留了所有的detections)boxes np.array([d.tlwh for d in detections])scores np.array([d.confidence for d in detections])indices non_max_suppression(boxes, self.nms_max_overlap, scores)detections [detections[i] for i in indices]...
卡尔曼滤波预测阶段
使用卡尔曼滤波预测前一帧中的tracks在当前帧的状态
# track.py
def predict(self, kf):Propagate the state distribution to the current time step using a Kalman filter prediction step.Parameters----------kf: The Kalman filter.self.mean, self.covariance kf.predict(self.mean, self.covariance) # 预测self.age 1 # 该track自出现以来的总帧数加1self.time_since_update 1 # 该track自最近一次更新以来的总帧数加1
匹配
首先对基于外观信息的马氏距离计算tracks和detections的代价矩阵然后相继进行级联匹配和IOU匹配最后得到当前帧的所有匹配对、未匹配的tracks以及未匹配的detections
# tracker.py
def _match(self, detections):def gated_metric(racks, dets, track_indices, detection_indices):基于外观信息和马氏距离计算卡尔曼滤波预测的tracks和当前时刻检测到的detections的代价矩阵features np.array([dets[i].feature for i in detection_indices])targets np.array([tracks[i].track_id for i in track_indices]# 基于外观信息计算tracks和detections的余弦距离代价矩阵cost_matrix self.metric.distance(features, targets)# 基于马氏距离过滤掉代价矩阵中一些不合适的项 (将其设置为一个较大的值)cost_matrix linear_assignment.gate_cost_matrix(self.kf, cost_matrix, tracks, dets, track_indices, detection_indices)return cost_matrix# 区分开confirmed tracks和unconfirmed tracksconfirmed_tracks [i for i, t in enumerate(self.tracks) if t.is_confirmed()]unconfirmed_tracks [i for i, t in enumerate(self.tracks) if not t.is_confirmed()]# 对confirmd tracks进行级联匹配matches_a, unmatched_tracks_a, unmatched_detections \linear_assignment.matching_cascade(gated_metric, self.metric.matching_threshold, self.max_age,self.tracks, detections, confirmed_tracks)# 对级联匹配中未匹配的tracks和unconfirmed tracks中time_since_update为1的tracks进行IOU匹配iou_track_candidates unconfirmed_tracks [k for k in unmatched_tracks_a ifself.tracks[k].time_since_update 1]unmatched_tracks_a [k for k in unmatched_tracks_a ifself.tracks[k].time_since_update ! 1]matches_b, unmatched_tracks_b, unmatched_detections \linear_assignment.min_cost_matching(iou_matching.iou_cost, self.max_iou_distance, self.tracks,detections, iou_track_candidates, unmatched_detections)# 整合所有的匹配对和未匹配的tracksmatches matches_a matches_bunmatched_tracks list(set(unmatched_tracks_a unmatched_tracks_b))return matches, unmatched_tracks, unmatched_detections# 级联匹配源码 linear_assignment.py
def matching_cascade(distance_metric, max_distance, cascade_depth, tracks, detections, track_indicesNone, detection_indicesNone):...unmatched_detections detection_indicematches []# 由小到大依次对每个level的tracks做匹配for level in range(cascade_depth):# 如果没有detections退出循环if len(unmatched_detections) 0: break# 当前level的所有tracks索引track_indices_l [k for k in track_indices if tracks[k].time_since_update 1 level]# 如果当前level没有track继续if len(track_indices_l) 0: continue# 匈牙利匹配matches_l, _, unmatched_detections min_cost_matching(distance_metric, max_distance, tracks, detections, track_indices_l, unmatched_detections)matches matches_lunmatched_tracks list(set(track_indices) - set(k for k, _ in matches))return matches, unmatched_tracks, unmatched_detections
卡尔曼滤波更新阶段
对于每个匹配成功的track用其对应的detection进行更新并处理未匹配tracks和detections
# tracker.py
def update(self, detections):Perform measurement update and track management.Parameters----------detections: List[deep_sort.detection.Detection]A list of detections at the current time step.# 得到匹配对、未匹配的tracks、未匹配的dectectionsmatches, unmatched_tracks, unmatched_detections self._match(detections)# 对于每个匹配成功的track用其对应的detection进行更新for track_idx, detection_idx in matches:self.tracks[track_idx].update(self.kf, detections[detection_idx])# 对于未匹配的成功的track将其标记为丢失for track_idx in unmatched_tracks:self.tracks[track_idx].mark_missed()# 对于未匹配成功的detection初始化为新的trackfor detection_idx in unmatched_detections:self._initiate_track(detections[detection_idx])...