CamGear supports a diverse range of video streams which can handle/control video stream almost any IP/USB Cameras, multimedia video file format (upto 4k tested), any network stream URL such as http(s), rtp, rtsp, rtmp, mms, etc. It also supports Gstreamer's RAW pipelines.
CamGear API provides a flexible, high-level multi-threaded wrapper around OpenCV's VideoCapture API with direct access to almost all of its available parameters. It relies on Threaded Queue mode for threaded, error-free and synchronized frame handling.
CamGear internally implements yt_dlp backend class for seamlessly pipelining live video-frames and metadata from various streaming services like YouTube, Dailymotion, Twitch, and many more ➶
classCamGear:""" CamGear supports a diverse range of video streams which can handle/control video stream almost any IP/USB Cameras, multimedia video file format (upto 4k tested), any network stream URL such as http(s), rtp, rtsp, rtmp, mms, etc. It also supports Gstreamer's RAW pipelines. CamGear API provides a flexible, high-level multi-threaded wrapper around OpenCV's VideoCapture API with direct access to almost all of its available parameters. It relies on Threaded Queue mode for threaded, error-free and synchronized frame handling. CamGear internally implements `yt_dlp` backend class for seamlessly pipelining live video-frames and metadata from various streaming services like YouTube, Dailymotion, Twitch, and [many more ➶](https://github.com/yt-dlp/yt-dlp/blob/master/supportedsites.md#supported-sites) """def__init__(self,source:Any=0,stream_mode:bool=False,backend:int=0,colorspace:str=None,logging:bool=False,time_delay:int=0,**options:dict):""" This constructor method initializes the object state and attributes of the CamGear class. Parameters: source (based on input): defines the source for the input stream. stream_mode (bool): controls the exclusive **Stream Mode** for handling streaming URLs. backend (int): selects the backend for OpenCV's VideoCapture class. colorspace (str): selects the colorspace of the input stream. logging (bool): enables/disables logging. time_delay (int): time delay (in sec) before start reading the frames. options (dict): provides ability to alter Source Tweak Parameters. """# enable logging if specifiedself.__logging=loggingifisinstance(logging,bool)elseFalse# print current versionlogcurr_vidgear_ver(logging=self.__logging)# initialize globalself.ytv_metadata={}# check if Stream-Mode is ON (True)ifstream_mode:# TODO: check GStreamer backend support# gst_support = check_gstreamer_support(logging=self.__logging)# handle special Stream Mode parametersstream_resolution=get_supported_resolution(options.pop("STREAM_RESOLUTION","best"),logging=self.__logging)# handle Stream-Modeifnot(yt_dlpisNone):# extract user-defined paramsyt_stream_params=options.pop("STREAM_PARAMS",{})ifisinstance(yt_stream_params,dict):yt_stream_params={str(k).strip():vfork,vinyt_stream_params.items()}else:yt_stream_params={}try:# Validate source for Yt_dlp backendlogger.info("Verifying Streaming URL using yt-dlp backend. Please wait...")# initialize YT_backendytbackend=YT_backend(source_url=source,logging=self.__logging,**yt_stream_params)ifytbackend:# save video metadataself.ytv_metadata=ytbackend.meta_data# handle live-streams# Throw warning for livestreamsytbackend.is_livestreamandlogger.warning("Livestream URL detected. It is strongly recommended to use the GStreamer backend (`backend=cv2.CAP_GSTREAMER`) with these URLs.")# check whether stream-resolution was specified and availableifnot(stream_resolutioninytbackend.streams.keys()):logger.warning("Specified stream-resolution `{}` is not available. Reverting to `best`!".format(stream_resolution))# revert to beststream_resolution="best"else:self.__loggingandlogger.debug("Using `{}` resolution for streaming.".format(stream_resolution))# extract stream URL as source using stream-resolutionsource=ytbackend.streams[stream_resolution]# log progressself.__loggingandlogger.debug("YouTube source ID: `{}`, Title: `{}`, Quality: `{}`".format(self.ytv_metadata["id"],self.ytv_metadata["title"],stream_resolution,))exceptExceptionase:# raise error if something went wrongraiseValueError("[CamGear:ERROR] :: Stream Mode is enabled but Input URL is invalid!")else:# raise import errorsimport_dependency_safe("yt_dlp")# assigns special parameter to global variable and clear# Threaded Queue Modeself.__threaded_queue_mode=options.pop("THREADED_QUEUE_MODE",True)ifnotisinstance(self.__threaded_queue_mode,bool):# reset improper valuesself.__threaded_queue_mode=True# Thread Timeoutself.__thread_timeout=options.pop("THREAD_TIMEOUT",None)ifself.__thread_timeoutandisinstance(self.__thread_timeout,(int,float)):# set valuesself.__thread_timeout=float(self.__thread_timeout)else:# defaults to 5mins timeoutself.__thread_timeout=Noneself.__queue=None# initialize queue for video files onlyifself.__threaded_queue_modeandisinstance(source,str):# define queue and assign it to global varself.__queue=queue.Queue(maxsize=96)# max bufferlen 96 to check overflow# log itself.__loggingandlogger.debug("Enabling Threaded Queue Mode for the current video source!")else:# otherwise disable itself.__threaded_queue_mode=False# log itself.__loggingandlogger.warning("Threaded Queue Mode is disabled for the current video source!")self.__thread_timeoutandlogger.info("Setting Video-Thread Timeout to {}s.".format(self.__thread_timeout))# stream variable initializationself.stream=Noneifbackendandisinstance(backend,int):# add backend if specified and initialize the camera streamifcheck_CV_version()==3:# Different OpenCV 3.4.x statementself.stream=cv2.VideoCapture(source+backend)else:# Two parameters are available since OpenCV 4+ (master branch)self.stream=cv2.VideoCapture(source,backend)logger.info("Setting backend `{}` for this source.".format(backend))else:# initialize the camera streamself.stream=cv2.VideoCapture(source)# initializing colorspace variableself.color_space=None# apply attributes to source if specifiedoptions={str(k).strip():vfork,vinoptions.items()}forkey,valueinoptions.items():property=capPropId(key)not(propertyisNone)andself.stream.set(property,value)# handle colorspace valueifnot(colorspaceisNone):self.color_space=capPropId(colorspace.strip())self.__loggingandnot(self.color_spaceisNone)andlogger.debug("Enabling `{}` colorspace for this video stream!".format(colorspace.strip()))# initialize and assign frame-rate variableself.framerate=0.0_fps=self.stream.get(cv2.CAP_PROP_FPS)if_fps>1.0:self.framerate=_fps# applying time delay to warm-up webcam only if specifiedtime_delayandisinstance(time_delay,(int,float))andtime.sleep(time_delay)# frame variable initialization(grabbed,self.frame)=self.stream.read()# check if valid streamifgrabbed:# render colorspace if definedifnot(self.color_spaceisNone):self.frame=cv2.cvtColor(self.frame,self.color_space)# initialize and append to queueself.__threaded_queue_modeandself.__queue.put(self.frame)else:raiseRuntimeError("[CamGear:ERROR] :: Source is invalid, CamGear failed to initialize stream on this source!")# thread initializationself.__thread=None# initialize termination flag eventself.__terminate=Event()# initialize stream read flag eventself.__stream_read=Event()defstart(self)->T:""" Launches the internal *Threaded Frames Extractor* daemon. **Returns:** A reference to the CamGear class object. """self.__thread=Thread(target=self.__update,name="CamGear",args=())self.__thread.daemon=Trueself.__thread.start()returnselfdef__update(self):""" A **Threaded Frames Extractor**, that keep iterating frames from OpenCV's VideoCapture API to a internal monitored queue, until the thread is terminated, or frames runs out. """# keep iterating infinitely# until the thread is terminated# or frames runs out# if the thread indicator variable is set, stop the threadwhilenotself.__terminate.is_set():# stream not read yetself.__stream_read.clear()# otherwise, read the next frame from the stream(grabbed,frame)=self.stream.read()# stream read completedself.__stream_read.set()# check for valid frame if receivedifnotgrabbed:# no frames received, then safely exitifself.__threaded_queue_mode:ifself.__queue.empty():breakelse:continueelse:break# apply colorspace to frames if validifnot(self.color_spaceisNone):# apply colorspace to framescolor_frame=Nonetry:color_frame=cv2.cvtColor(frame,self.color_space)exceptExceptionase:# Catch if any error occurredcolor_frame=Noneself.color_space=Noneself.__loggingandlogger.exception(str(e))logger.warning("Assigned colorspace value is invalid. Discarding!")self.frame=color_frameifnot(color_frameisNone)elseframeelse:self.frame=frame# append to queueself.__threaded_queue_modeandself.__queue.put(self.frame)# signal queue we're doneself.__threaded_queue_modeandself.__queue.put(None)self.__threaded_queue_mode=False# indicate immediate terminationself.__terminate.set()self.__stream_read.set()# release resourcesself.stream.release()defread(self)->Optional[NDArray]:""" Extracts frames synchronously from monitored queue, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the queue is full. **Returns:** A n-dimensional numpy array. """whileself.__threaded_queue_modeandnotself.__terminate.is_set():returnself.__queue.get(timeout=self.__thread_timeout)# return current frame# only after stream is readreturn(self.frameifnotself.__terminate.is_set()# check if already terminatedandself.__stream_read.wait(timeout=self.__thread_timeout)# wait for itelseNone)defstop(self)->None:""" Safely terminates the thread, and release the multi-threaded resources. """self.__loggingandlogger.debug("Terminating processes.")# terminate Threaded queue mode separatelyself.__threaded_queue_mode=False# indicate that the thread# should be terminated immediatelyself.__stream_read.set()self.__terminate.set()# wait until stream resources are released (producer thread might be still grabbing frame)ifself.__threadisnotNone:ifnot(self.__queueisNone):whilenotself.__queue.empty():try:self.__queue.get_nowait()exceptqueue.Empty:continueself.__queue.task_done()self.__thread.join()
def__init__(self,source:Any=0,stream_mode:bool=False,backend:int=0,colorspace:str=None,logging:bool=False,time_delay:int=0,**options:dict):""" This constructor method initializes the object state and attributes of the CamGear class. Parameters: source (based on input): defines the source for the input stream. stream_mode (bool): controls the exclusive **Stream Mode** for handling streaming URLs. backend (int): selects the backend for OpenCV's VideoCapture class. colorspace (str): selects the colorspace of the input stream. logging (bool): enables/disables logging. time_delay (int): time delay (in sec) before start reading the frames. options (dict): provides ability to alter Source Tweak Parameters. """# enable logging if specifiedself.__logging=loggingifisinstance(logging,bool)elseFalse# print current versionlogcurr_vidgear_ver(logging=self.__logging)# initialize globalself.ytv_metadata={}# check if Stream-Mode is ON (True)ifstream_mode:# TODO: check GStreamer backend support# gst_support = check_gstreamer_support(logging=self.__logging)# handle special Stream Mode parametersstream_resolution=get_supported_resolution(options.pop("STREAM_RESOLUTION","best"),logging=self.__logging)# handle Stream-Modeifnot(yt_dlpisNone):# extract user-defined paramsyt_stream_params=options.pop("STREAM_PARAMS",{})ifisinstance(yt_stream_params,dict):yt_stream_params={str(k).strip():vfork,vinyt_stream_params.items()}else:yt_stream_params={}try:# Validate source for Yt_dlp backendlogger.info("Verifying Streaming URL using yt-dlp backend. Please wait...")# initialize YT_backendytbackend=YT_backend(source_url=source,logging=self.__logging,**yt_stream_params)ifytbackend:# save video metadataself.ytv_metadata=ytbackend.meta_data# handle live-streams# Throw warning for livestreamsytbackend.is_livestreamandlogger.warning("Livestream URL detected. It is strongly recommended to use the GStreamer backend (`backend=cv2.CAP_GSTREAMER`) with these URLs.")# check whether stream-resolution was specified and availableifnot(stream_resolutioninytbackend.streams.keys()):logger.warning("Specified stream-resolution `{}` is not available. Reverting to `best`!".format(stream_resolution))# revert to beststream_resolution="best"else:self.__loggingandlogger.debug("Using `{}` resolution for streaming.".format(stream_resolution))# extract stream URL as source using stream-resolutionsource=ytbackend.streams[stream_resolution]# log progressself.__loggingandlogger.debug("YouTube source ID: `{}`, Title: `{}`, Quality: `{}`".format(self.ytv_metadata["id"],self.ytv_metadata["title"],stream_resolution,))exceptExceptionase:# raise error if something went wrongraiseValueError("[CamGear:ERROR] :: Stream Mode is enabled but Input URL is invalid!")else:# raise import errorsimport_dependency_safe("yt_dlp")# assigns special parameter to global variable and clear# Threaded Queue Modeself.__threaded_queue_mode=options.pop("THREADED_QUEUE_MODE",True)ifnotisinstance(self.__threaded_queue_mode,bool):# reset improper valuesself.__threaded_queue_mode=True# Thread Timeoutself.__thread_timeout=options.pop("THREAD_TIMEOUT",None)ifself.__thread_timeoutandisinstance(self.__thread_timeout,(int,float)):# set valuesself.__thread_timeout=float(self.__thread_timeout)else:# defaults to 5mins timeoutself.__thread_timeout=Noneself.__queue=None# initialize queue for video files onlyifself.__threaded_queue_modeandisinstance(source,str):# define queue and assign it to global varself.__queue=queue.Queue(maxsize=96)# max bufferlen 96 to check overflow# log itself.__loggingandlogger.debug("Enabling Threaded Queue Mode for the current video source!")else:# otherwise disable itself.__threaded_queue_mode=False# log itself.__loggingandlogger.warning("Threaded Queue Mode is disabled for the current video source!")self.__thread_timeoutandlogger.info("Setting Video-Thread Timeout to {}s.".format(self.__thread_timeout))# stream variable initializationself.stream=Noneifbackendandisinstance(backend,int):# add backend if specified and initialize the camera streamifcheck_CV_version()==3:# Different OpenCV 3.4.x statementself.stream=cv2.VideoCapture(source+backend)else:# Two parameters are available since OpenCV 4+ (master branch)self.stream=cv2.VideoCapture(source,backend)logger.info("Setting backend `{}` for this source.".format(backend))else:# initialize the camera streamself.stream=cv2.VideoCapture(source)# initializing colorspace variableself.color_space=None# apply attributes to source if specifiedoptions={str(k).strip():vfork,vinoptions.items()}forkey,valueinoptions.items():property=capPropId(key)not(propertyisNone)andself.stream.set(property,value)# handle colorspace valueifnot(colorspaceisNone):self.color_space=capPropId(colorspace.strip())self.__loggingandnot(self.color_spaceisNone)andlogger.debug("Enabling `{}` colorspace for this video stream!".format(colorspace.strip()))# initialize and assign frame-rate variableself.framerate=0.0_fps=self.stream.get(cv2.CAP_PROP_FPS)if_fps>1.0:self.framerate=_fps# applying time delay to warm-up webcam only if specifiedtime_delayandisinstance(time_delay,(int,float))andtime.sleep(time_delay)# frame variable initialization(grabbed,self.frame)=self.stream.read()# check if valid streamifgrabbed:# render colorspace if definedifnot(self.color_spaceisNone):self.frame=cv2.cvtColor(self.frame,self.color_space)# initialize and append to queueself.__threaded_queue_modeandself.__queue.put(self.frame)else:raiseRuntimeError("[CamGear:ERROR] :: Source is invalid, CamGear failed to initialize stream on this source!")# thread initializationself.__thread=None# initialize termination flag eventself.__terminate=Event()# initialize stream read flag eventself.__stream_read=Event()
Extracts frames synchronously from monitored queue, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the queue is full.
defread(self)->Optional[NDArray]:""" Extracts frames synchronously from monitored queue, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the queue is full. **Returns:** A n-dimensional numpy array. """whileself.__threaded_queue_modeandnotself.__terminate.is_set():returnself.__queue.get(timeout=self.__thread_timeout)# return current frame# only after stream is readreturn(self.frameifnotself.__terminate.is_set()# check if already terminatedandself.__stream_read.wait(timeout=self.__thread_timeout)# wait for itelseNone)
defstart(self)->T:""" Launches the internal *Threaded Frames Extractor* daemon. **Returns:** A reference to the CamGear class object. """self.__thread=Thread(target=self.__update,name="CamGear",args=())self.__thread.daemon=Trueself.__thread.start()returnself
defstop(self)->None:""" Safely terminates the thread, and release the multi-threaded resources. """self.__loggingandlogger.debug("Terminating processes.")# terminate Threaded queue mode separatelyself.__threaded_queue_mode=False# indicate that the thread# should be terminated immediatelyself.__stream_read.set()self.__terminate.set()# wait until stream resources are released (producer thread might be still grabbing frame)ifself.__threadisnotNone:ifnot(self.__queueisNone):whilenotself.__queue.empty():try:self.__queue.get_nowait()exceptqueue.Empty:continueself.__queue.task_done()self.__thread.join()