This is an auxiliary class that enables Video Stabilization for vidgear with minimalistic latency, and at the expense of little to no additional computational requirements.
The basic idea behind it is to tracks and save the salient feature array for the given number of frames and then uses these anchor point to cancel out all perturbations relative to it for the incoming frames in the queue. This class relies heavily on Threaded Queue mode for error-free & ultra-fast frame handling.
classStabilizer:""" This is an auxiliary class that enables Video Stabilization for vidgear with minimalistic latency, and at the expense of little to no additional computational requirements. The basic idea behind it is to tracks and save the salient feature array for the given number of frames and then uses these anchor point to cancel out all perturbations relative to it for the incoming frames in the queue. This class relies heavily on **Threaded Queue mode** for error-free & ultra-fast frame handling. """def__init__(self,smoothing_radius:int=25,border_type:str="black",border_size:int=0,crop_n_zoom:bool=False,logging:bool=False,):""" This constructor method initializes the object state and attributes of the Stabilizer class. Parameters: smoothing_radius (int): alter averaging window size. border_type (str): changes the extended border type. border_size (int): enables and set the value for extended border size to reduce the black borders. crop_n_zoom (bool): enables cropping and zooming of frames(to original size) to reduce the black borders. logging (bool): enables/disables logging. """# enable logging if specifiedself.__logging=loggingifisinstance(logging,bool)elseFalse# print current versionlogcurr_vidgear_ver(logging=self.__logging)# initialize deques for handling input frames and its indexesself.__frame_queue=deque(maxlen=smoothing_radius)self.__frame_queue_indexes=deque(maxlen=smoothing_radius)# define and create Adaptive histogram equalization (AHE) object for optimizationsself.__clahe=cv2.createCLAHE(clipLimit=2.0,tileGridSize=(8,8))# initialize global varsself.__smoothing_radius=smoothing_radius# averaging window, handles the quality of stabilization at expense of latency and sudden panningself.__smoothed_path=None# handles the smoothed path with box filterself.__path=None# handles path i.e cumulative sum of previous_2_current transformations along a axisself.__transforms=[]# handles previous_2_current transformations [dx,dy,da]self.__frame_transforms_smoothed=None# handles smoothed array of previous_2_current transformations w.r.t to framesself.__previous_gray=None# handles previous gray frameself.__previous_keypoints=(None# handles previous detect_GFTTed keypoints w.r.t previous gray frame)self.__frame_height,self.frame_width=(0,0,)# handles width and height of input framesself.__crop_n_zoom=0# handles cropping and zooms frames to reduce the black borders from stabilization being too noticeable.# if check if crop_n_zoom definedifcrop_n_zoomandborder_size:self.__crop_n_zoom=border_size# crops and zoom frame to original sizeself.__border_size=0# zero out border sizeself.__frame_size=None# handles frame size for zoomingself.__loggingandlogger.debug("Setting Cropping margin {} pixels".format(border_size))else:# Add output borders to frameself.__border_size=border_sizeself.__loggingandborder_sizeandlogger.debug("Setting Border size {} pixels".format(border_size))# define valid border modesborder_modes={"black":cv2.BORDER_CONSTANT,"reflect":cv2.BORDER_REFLECT,"reflect_101":cv2.BORDER_REFLECT_101,"replicate":cv2.BORDER_REPLICATE,"wrap":cv2.BORDER_WRAP,}# choose valid border_mode from border_typeifborder_typein["black","reflect","reflect_101","replicate","wrap"]:ifnotcrop_n_zoom:# initialize global border mode variableself.__border_mode=border_modes[border_type]self.__loggingandborder_type!="black"andlogger.info("Setting Border type: {}".format(border_type))else:# log and reset to defaultself.__loggingandborder_type!="black"andlogger.debug("Setting border type is disabled if cropping is enabled!")self.__border_mode=border_modes["black"]else:# otherwise log if notself.__loggingandlogger.debug("Invalid input border type!")self.__border_mode=border_modes["black"]# reset to default mode# define OpenCV versionself.__cv2_version=check_CV_version()# retrieve best interpolationself.__interpolation=retrieve_best_interpolation(["INTER_LINEAR_EXACT","INTER_LINEAR","INTER_AREA"])# define normalized box filterself.__box_filter=np.ones(smoothing_radius)/smoothing_radiusdefstabilize(self,frame:np.ndarray)->Optional[np.ndarray]:""" This method takes an unstabilized video frame, and returns a stabilized one. Parameters: frame (numpy.ndarray): inputs unstabilized video frames. """# check if frame is NoneifframeisNone:# return if it doesreturn# save frame size for zoomingifself.__crop_n_zoomandself.__frame_size==None:self.__frame_size=frame.shape[:2]# initiate transformations capturingifnotself.__frame_queue:# for first frameprevious_gray=cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)# convert to grayprevious_gray=self.__clahe.apply(previous_gray)# optimize gray frameself.__previous_keypoints=cv2.goodFeaturesToTrack(previous_gray,maxCorners=200,qualityLevel=0.05,minDistance=30.0,blockSize=3,mask=None,useHarrisDetector=False,k=0.04,)# track features using GFTTself.__frame_height,self.frame_width=frame.shape[:2]# save input frame height and widthself.__frame_queue.append(frame)# save frame to dequeself.__frame_queue_indexes.append(0)# save frame index to dequeself.__previous_gray=previous_gray[:]# save gray frame clone for further processingelifself.__frame_queue_indexes[-1]<self.__smoothing_radius-1:# for rest of framesself.__frame_queue.append(frame)# save frame to dequeself.__frame_queue_indexes.append(self.__frame_queue_indexes[-1]+1)# save frame indexself.__generate_transformations()# generate transformationselse:# start applying transformationsself.__frame_queue.append(frame)# save frame to dequeself.__frame_queue_indexes.append(self.__frame_queue_indexes[-1]+1)# save frame indexself.__generate_transformations()# generate transformations# calculate smooth path once transformation capturing is completedforiinrange(3):# apply normalized box filter to the pathself.__smoothed_path[:,i]=self.__box_filter_convolve((self.__path[:,i]),window_size=self.__smoothing_radius)# calculate deviation of path from smoothed pathdeviation=self.__smoothed_path-self.__path# save smoothed transformationself.__frame_transforms_smoothed=self.frame_transform+deviation# return transformation applied stabilized framereturnself.__apply_transformations()def__generate_transformations(self):""" An internal method that generate previous-to-current transformations [dx,dy,da]. """frame_gray=cv2.cvtColor(self.__frame_queue[-1],cv2.COLOR_BGR2GRAY)# retrieve current frame and convert to grayframe_gray=self.__clahe.apply(frame_gray)# optimize ittransformation=Nonetry:# calculate optical flow using Lucas-Kanade differential methodcurr_kps,status,error=cv2.calcOpticalFlowPyrLK(self.__previous_gray,frame_gray,self.__previous_keypoints,None)# select only valid key-pointsvalid_curr_kps=curr_kps[status==1]# currentvalid_previous_keypoints=self.__previous_keypoints[status==1]# previous# calculate optimal affine transformation between previous_2_current key-pointsifself.__cv2_version==3:# backward compatibility with OpenCV3transformation=cv2.estimateRigidTransform(valid_previous_keypoints,valid_curr_kps,False)else:transformation=cv2.estimateAffinePartial2D(valid_previous_keypoints,valid_curr_kps)[0]exceptcv2.errorase:# catch any OpenCV assertion errors and warn userlogger.warning("Video-Frame is too dark to generate any transformations!")transformation=None# check if transformation is not Noneifnot(transformationisNone):# previous_2_current translation in x directiondx=transformation[0,2]# previous_2_current translation in y directiondy=transformation[1,2]# previous_2_current rotation in angleda=np.arctan2(transformation[1,0],transformation[0,0])else:# otherwise zero itdx=dy=da=0# save this transformationself.__transforms.append([dx,dy,da])# calculate path from cumulative transformations sumself.frame_transform=np.array(self.__transforms,dtype="float32")self.__path=np.cumsum(self.frame_transform,axis=0)# create smoothed path from a copy of pathself.__smoothed_path=np.copy(self.__path)# re-calculate and save GFTT key-points for current gray frameself.__previous_keypoints=cv2.goodFeaturesToTrack(frame_gray,maxCorners=200,qualityLevel=0.05,minDistance=30.0,blockSize=3,mask=None,useHarrisDetector=False,k=0.04,)# save this gray frame for further processingself.__previous_gray=frame_gray[:]def__box_filter_convolve(self,path,window_size):""" An internal method that applies *normalized linear box filter* to path w.r.t averaging window Parameters: * path (numpy.ndarray): a cumulative sum of transformations * window_size (int): averaging window size """# pad path to size of averaging windowpath_padded=np.pad(path,(window_size,window_size),"median")# apply linear box filter to pathpath_smoothed=np.convolve(path_padded,self.__box_filter,mode="same")# crop the smoothed path to original pathpath_smoothed=path_smoothed[window_size:-window_size]# assert if cropping is completedassertpath.shape==path_smoothed.shape# return smoothed pathreturnpath_smootheddef__apply_transformations(self):""" An internal method that applies affine transformation to the given frame from previously calculated transformations """# extract frame and its index from dequequeue_frame=self.__frame_queue.popleft()queue_frame_index=self.__frame_queue_indexes.popleft()# create border around extracted frame w.r.t border_sizebordered_frame=cv2.copyMakeBorder(queue_frame,top=self.__border_size,bottom=self.__border_size,left=self.__border_size,right=self.__border_size,borderType=self.__border_mode,value=[0,0,0],)alpha_bordered_frame=cv2.cvtColor(bordered_frame,cv2.COLOR_BGR2BGRA)# create alpha channel# extract alpha channelalpha_bordered_frame[:,:,3]=0alpha_bordered_frame[self.__border_size:self.__border_size+self.__frame_height,self.__border_size:self.__border_size+self.frame_width,3,]=255# extracting Transformations w.r.t frame indexdx=self.__frame_transforms_smoothed[queue_frame_index,0]# x-axisdy=self.__frame_transforms_smoothed[queue_frame_index,1]# y-axisda=self.__frame_transforms_smoothed[queue_frame_index,2]# angle# building 2x3 transformation matrix from extracted transformationsqueue_frame_transform=np.zeros((2,3),np.float32)queue_frame_transform[0,0]=np.cos(da)queue_frame_transform[0,1]=-np.sin(da)queue_frame_transform[1,0]=np.sin(da)queue_frame_transform[1,1]=np.cos(da)queue_frame_transform[0,2]=dxqueue_frame_transform[1,2]=dy# Applying an affine transformation to the frameframe_wrapped=cv2.warpAffine(alpha_bordered_frame,queue_frame_transform,alpha_bordered_frame.shape[:2][::-1],borderMode=self.__border_mode,)# drop alpha channelframe_stabilized=frame_wrapped[:,:,:3]# crop and zoomifself.__crop_n_zoom:# crop stabilized frameframe_cropped=frame_stabilized[self.__crop_n_zoom:-self.__crop_n_zoom,self.__crop_n_zoom:-self.__crop_n_zoom,]# zoom stabilized frameframe_stabilized=cv2.resize(frame_cropped,self.__frame_size[::-1],interpolation=self.__interpolation,)# finally return stabilized framereturnframe_stabilizeddefclean(self)->None:""" Cleans Stabilizer resources """# check if deque presentifself.__frame_queue:# clear frame dequeself.__frame_queue.clear()# clear frame indexes dequeself.__frame_queue_indexes.clear()
def__init__(self,smoothing_radius:int=25,border_type:str="black",border_size:int=0,crop_n_zoom:bool=False,logging:bool=False,):""" This constructor method initializes the object state and attributes of the Stabilizer class. Parameters: smoothing_radius (int): alter averaging window size. border_type (str): changes the extended border type. border_size (int): enables and set the value for extended border size to reduce the black borders. crop_n_zoom (bool): enables cropping and zooming of frames(to original size) to reduce the black borders. logging (bool): enables/disables logging. """# enable logging if specifiedself.__logging=loggingifisinstance(logging,bool)elseFalse# print current versionlogcurr_vidgear_ver(logging=self.__logging)# initialize deques for handling input frames and its indexesself.__frame_queue=deque(maxlen=smoothing_radius)self.__frame_queue_indexes=deque(maxlen=smoothing_radius)# define and create Adaptive histogram equalization (AHE) object for optimizationsself.__clahe=cv2.createCLAHE(clipLimit=2.0,tileGridSize=(8,8))# initialize global varsself.__smoothing_radius=smoothing_radius# averaging window, handles the quality of stabilization at expense of latency and sudden panningself.__smoothed_path=None# handles the smoothed path with box filterself.__path=None# handles path i.e cumulative sum of previous_2_current transformations along a axisself.__transforms=[]# handles previous_2_current transformations [dx,dy,da]self.__frame_transforms_smoothed=None# handles smoothed array of previous_2_current transformations w.r.t to framesself.__previous_gray=None# handles previous gray frameself.__previous_keypoints=(None# handles previous detect_GFTTed keypoints w.r.t previous gray frame)self.__frame_height,self.frame_width=(0,0,)# handles width and height of input framesself.__crop_n_zoom=0# handles cropping and zooms frames to reduce the black borders from stabilization being too noticeable.# if check if crop_n_zoom definedifcrop_n_zoomandborder_size:self.__crop_n_zoom=border_size# crops and zoom frame to original sizeself.__border_size=0# zero out border sizeself.__frame_size=None# handles frame size for zoomingself.__loggingandlogger.debug("Setting Cropping margin {} pixels".format(border_size))else:# Add output borders to frameself.__border_size=border_sizeself.__loggingandborder_sizeandlogger.debug("Setting Border size {} pixels".format(border_size))# define valid border modesborder_modes={"black":cv2.BORDER_CONSTANT,"reflect":cv2.BORDER_REFLECT,"reflect_101":cv2.BORDER_REFLECT_101,"replicate":cv2.BORDER_REPLICATE,"wrap":cv2.BORDER_WRAP,}# choose valid border_mode from border_typeifborder_typein["black","reflect","reflect_101","replicate","wrap"]:ifnotcrop_n_zoom:# initialize global border mode variableself.__border_mode=border_modes[border_type]self.__loggingandborder_type!="black"andlogger.info("Setting Border type: {}".format(border_type))else:# log and reset to defaultself.__loggingandborder_type!="black"andlogger.debug("Setting border type is disabled if cropping is enabled!")self.__border_mode=border_modes["black"]else:# otherwise log if notself.__loggingandlogger.debug("Invalid input border type!")self.__border_mode=border_modes["black"]# reset to default mode# define OpenCV versionself.__cv2_version=check_CV_version()# retrieve best interpolationself.__interpolation=retrieve_best_interpolation(["INTER_LINEAR_EXACT","INTER_LINEAR","INTER_AREA"])# define normalized box filterself.__box_filter=np.ones(smoothing_radius)/smoothing_radius
defstabilize(self,frame:np.ndarray)->Optional[np.ndarray]:""" This method takes an unstabilized video frame, and returns a stabilized one. Parameters: frame (numpy.ndarray): inputs unstabilized video frames. """# check if frame is NoneifframeisNone:# return if it doesreturn# save frame size for zoomingifself.__crop_n_zoomandself.__frame_size==None:self.__frame_size=frame.shape[:2]# initiate transformations capturingifnotself.__frame_queue:# for first frameprevious_gray=cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)# convert to grayprevious_gray=self.__clahe.apply(previous_gray)# optimize gray frameself.__previous_keypoints=cv2.goodFeaturesToTrack(previous_gray,maxCorners=200,qualityLevel=0.05,minDistance=30.0,blockSize=3,mask=None,useHarrisDetector=False,k=0.04,)# track features using GFTTself.__frame_height,self.frame_width=frame.shape[:2]# save input frame height and widthself.__frame_queue.append(frame)# save frame to dequeself.__frame_queue_indexes.append(0)# save frame index to dequeself.__previous_gray=previous_gray[:]# save gray frame clone for further processingelifself.__frame_queue_indexes[-1]<self.__smoothing_radius-1:# for rest of framesself.__frame_queue.append(frame)# save frame to dequeself.__frame_queue_indexes.append(self.__frame_queue_indexes[-1]+1)# save frame indexself.__generate_transformations()# generate transformationselse:# start applying transformationsself.__frame_queue.append(frame)# save frame to dequeself.__frame_queue_indexes.append(self.__frame_queue_indexes[-1]+1)# save frame indexself.__generate_transformations()# generate transformations# calculate smooth path once transformation capturing is completedforiinrange(3):# apply normalized box filter to the pathself.__smoothed_path[:,i]=self.__box_filter_convolve((self.__path[:,i]),window_size=self.__smoothing_radius)# calculate deviation of path from smoothed pathdeviation=self.__smoothed_path-self.__path# save smoothed transformationself.__frame_transforms_smoothed=self.frame_transform+deviation# return transformation applied stabilized framereturnself.__apply_transformations()