PiGear implements a seamless and robust wrapper around the picamera2 python library, simplifying integration with minimal code changes and ensuring a smooth transition for developers already familiar with the Picamera2 API. PiGear leverages the libcamera API under the hood with multi-threading, providing high-performance , enhanced control and functionality for Raspberry Pi camera modules.
PiGear handles common configuration parameters and non-standard settings for various camera types, simplifying the integration process. PiGear currently supports picamera2 API parameters such as sensor, controls, transform, and stride, with internal type and sanity checks for robust performance.
While primarily focused on Raspberry Pi camera modules, PiGear also provides basic functionality for USB webcams only with Picamera2 API, along with the ability to accurately differentiate between USB and Raspberry Pi cameras using metadata.
Backward compatibility with picamera library
PiGear seamlessly switches to the legacy picamera library if the picamera2 library is unavailable, ensuring seamless backward compatibility. For this, PiGear also provides a flexible multi-threaded framework around complete picamera API, allowing developers to effortlessly exploit a wide range of parameters, such as brightness, saturation, sensor_mode, iso, exposure, and more.
Furthermore, PiGear supports the use of multiple camera modules, including those found on Raspberry Pi Compute Module IO boards and USB cameras (only with Picamera2 API).
Threaded Internal Timer
PiGear ensures proper resource release during the termination of the API, preventing potential issues or resource leaks. PiGear API internally implements a Threaded Internal Timer that silently keeps active track of any frozen-threads or hardware-failures and exits safely if any do occur. This means that if you're running the PiGear API in your script and someone accidentally pulls the Camera-Module cable out, instead of going into a possible kernel panic, the API will exit safely to save resources.
classPiGear:""" PiGear implements a seamless and robust wrapper around the [picamera2](https://github.com/raspberrypi/picamera2) python library, simplifying integration with minimal code changes and ensuring a smooth transition for developers already familiar with the Picamera2 API. PiGear leverages the `libcamera` API under the hood with multi-threading, providing high-performance :fire:, enhanced control and functionality for Raspberry Pi camera modules. PiGear handles common configuration parameters and non-standard settings for various camera types, simplifying the integration process. PiGear currently supports picamera2 API parameters such as `sensor`, `controls`, `transform`, and `stride`, with internal type and sanity checks for robust performance. While primarily focused on Raspberry Pi camera modules, PiGear also provides basic functionality for USB webcams only with Picamera2 API, along with the ability to accurately differentiate between USB and Raspberry Pi cameras using metadata. ???+ info "Backward compatibility with `picamera` library" PiGear seamlessly switches to the legacy [picamera](https://picamera.readthedocs.io/en/release-1.13/index.html) library if the `picamera2` library is unavailable, ensuring seamless backward compatibility. For this, PiGear also provides a flexible multi-threaded framework around complete `picamera` API, allowing developers to effortlessly exploit a wide range of parameters, such as `brightness`, `saturation`, `sensor_mode`, `iso`, `exposure`, and more. Furthermore, PiGear supports the use of multiple camera modules, including those found on Raspberry Pi Compute Module IO boards and USB cameras _(only with Picamera2 API)_. ??? new "Threaded Internal Timer :material-camera-timer:" PiGear ensures proper resource release during the termination of the API, preventing potential issues or resource leaks. PiGear API internally implements a ==Threaded Internal Timer== that silently keeps active track of any frozen-threads or hardware-failures and exits safely if any do occur. This means that if you're running the PiGear API in your script and someone accidentally pulls the Camera-Module cable out, instead of going into a possible kernel panic, the API will exit safely to save resources. !!! failure "Make sure to [enable Raspberry Pi hardware-specific settings](https://picamera.readthedocs.io/en/release-1.13/quickstart.html) prior using this API, otherwise nothing will work." """def__init__(self,camera_num:int=0,resolution:Tuple[int,int]=(640,480),framerate:Union[int,float]=30,colorspace:str=None,logging:bool=False,time_delay:int=0,**options:dict):""" This constructor method initializes the object state and attributes of the PiGear class. Parameters: camera_num (int): selects the camera module index which will be used as source. resolution (tuple): sets the resolution (i.e. `(width,height)`) of the source.. framerate (int/float): sets the framerate of the source. colorspace (str): selects the colorspace of the input stream. logging (bool): enables/disables logging. time_delay (int): time delay (in sec) before start reading the frames. options (dict): provides ability to alter Source Tweak Parameters. """# enable logging if specifiedself.__logging=loggingifisinstance(logging,bool)elseFalse# print current versionlogcurr_vidgear_ver(logging=self.__logging)# raise error(s) for critical Class importsglobalpicamera,picamera2ifpicamera2:# log if picamera2self.__loggingandlogger.info("picamera2 API is currently being accessed.")elifpicamera:# switch to picamera otherwiselogger.critical("picamera2 library not installed on this system. Defaulting to legacy picamera API.")else:# raise error if noneimport_dependency_safe("picamera")assert(isinstance(framerate,(int,float))andframerate>0.0),"[PiGear:ERROR] :: Input framerate value `{}` is a Invalid! Kindly read docs.".format(framerate)assert(isinstance(resolution,(tuple,list))andlen(resolution)==2),"[PiGear:ERROR] :: Input resolution value `{}` is a Invalid! Kindly read docs.".format(resolution)ifnot(isinstance(camera_num,int)andcamera_num>=0):camera_num=0logger.warning("Input camera_num value `{}` is invalid, Defaulting to index 0!")# reformat dictoptions={str(k).strip():vfork,vinoptions.items()}# check if legacy picamera backend is enforcedenforce_legacy_picamera=options.pop("enforce_legacy_picamera",False)ifisinstance(enforce_legacy_picamera,bool)andenforce_legacy_picamera:# check if picamera library is available.ifpicamera:logger.critical("Enforcing legacy picamera API for this run. picamera2 API access will be disabled!")# disable picamera2picamera2=Noneelse:# raise error otherwiselogger.error("`picamera` is unavailable or unsupported on this system, `enforce_legacy_picamera` will be discarded!")import_dependency_safe("picamera")ifpicamera2:# handle loggingnot(self.__logging)andnotos.getenv("LIBCAMERA_LOG_LEVELS",False)andlogger.info("Kindly set `LIBCAMERA_LOG_LEVELS=2` environment variable to disable common libcamera API messages.")# collect metadatacameras_metadata=Picamera2.global_camera_info()# initialize the picamera stream at given indexself.__camera=Picamera2(camera_num=camera_num)# extract metadata for current cameracamera_metadata=[xforxincameras_metadataifx["Num"]==camera_num][0]# check connected camera is USB or I2Cself.__camera_is_usb=Trueif"usb"incamera_metadata["Id"]elseFalse# handle framerate controlifnotself.__camera_is_usb:self.__camera.set_controls({"FrameRate":framerate})else:logger.warning("USB camera detected. Setting input framerate is NOT supported with Picamera2 API!")# logself.__loggingandlogger.debug("Activating Picamera2 API for `{}` camera at index: {} with resolution: {} & framerate: {}".format(camera_metadata["Model"],camera_num,resolutionifnotself.__camera_is_usbelse"default",framerate,))else:# initialize the picamera stream at given indexself.__camera=PiCamera(camera_num=camera_num)self.__camera.resolution=tuple(resolution)self.__camera.framerate=framerateself.__loggingandlogger.debug("Activating Picamera API at index: {} with resolution: {} & framerate: {}".format(camera_num,resolution,framerate))# initialize framerate (Read-only) variableself.framerate=framerate# initializing colorspace variableself.color_space=None# define timeout variable default value(handles hardware failures)self.__failure_timeout=options.pop("HWFAILURE_TIMEOUT",2.0)ifisinstance(self.__failure_timeout,(int,float)):ifnot(10.0>self.__failure_timeout>1.0):raiseValueError("[PiGear:ERROR] :: `HWFAILURE_TIMEOUT` value can only be between 1.0 ~ 10.0")self.__loggingandlogger.debug("Setting HW Failure Timeout: {} seconds".format(self.__failure_timeout))else:# reset improper valuesself.__failure_timeout=2.0try:ifpicamera2:# define common supported picamera2 config parametersvalid_config_options=["auto_align_output_size",# internal"enable_verbose_logs",# internal"format","sensor",]# define non-USB supported picamera2 config parametersnon_usb_options=["controls",# not-supported on USB"transform",# not-working on USB"buffer_count",# not-supported on USB"queue",# not-supported on USB]# Less are supported (will be changed in future)# filter parameter supported with non-USB cameras onlyifself.__camera_is_usb:unsupported_config_keys=set(list(options.keys())).intersection(set(non_usb_options))unsupported_config_keysandlogger.warning("Setting parameters: `{}` for USB camera is NOT supported with Picamera2 API!".format("`, `".join(unsupported_config_keys)))else:valid_config_options+=non_usb_options# log all invalid keysinvalid_config_keys=set(list(options.keys()))-set(valid_config_options)invalid_config_keysandlogger.warning("Discarding invalid options NOT supported by Picamera2 API for current Camera Sensor: `{}`".format("`, `".join(invalid_config_keys)))# delete all unsupported optionsoptions={x:yforx,yinoptions.items()ifxinvalid_config_options}# setting size, already definedoptions.update({"size":tuple(resolution)})# set 24-bit, BGR format by defaultifnot"format"inoptions:# auto defaults for USB camerasnotself.__camera_is_usbandoptions.update({"format":"RGB888"})elifself.__camera_is_usb:# check the supported formats, if USB cameraavail_formats=[mode["format"]formodeinself.__camera.sensor_modes]# handle unsupported formatsifnotoptions["format"]inavail_formats:logger.warning("Discarding `format={}`. `{}` are the only available formats for USB camera in use!".format(options["format"],"`, `".join(avail_formats)))deloptions["format"]else:# `colorspace` parameter must define with `format` optional parameter# unless format is MPEG (tested)(not(colorspaceisNone)oroptions["format"]=="MPEG")andlogger.warning("Custom Output frames `format={}` detected. It is advised to define `colorspace` parameter or handle this format manually in your code!".format(options["format"]))else:# `colorspace` parameter must define with `format` optional parameter# unless format is either BGR or BGRA(not(colorspaceisNone)oroptions["format"]in["RGB888","XRGB8888",])andlogger.warning("Custom Output frames `format={}` detected. It is advised to define `colorspace` parameter or handle this format manually in your code!".format(options["format"]))# enable verbose logging mode (handled by Picamera2 API)verbose=options.pop("enable_verbose_logs",False)ifself.__loggingandisinstance(verbose,bool)andverbose:self.__camera.set_logging(Picamera2.DEBUG)else:# setup loggingself.__camera.set_logging(Picamera2.WARNING)# handle transformations, if specifiedtransform=options.pop("transform",Transform())ifnotisinstance(transform,Transform):logger.warning("`transform` value is of invalid type, Discarding!")transform=Transform()# handle sensor configurations, if specifiedsensor=options.pop("sensor",{})ifisinstance(sensor,dict):# extract all valid sensor keysvalid_sensor=["output_size","bit_depth"]# log all invalid keysinvalid_sensor_keys=set(list(sensor))-set(valid_sensor)invalid_sensor_keysandlogger.warning("Discarding sensor properties NOT supported by current Camera Sensor: `{}`. Only supported are: (`{}`)".format("`, `".join(invalid_sensor_keys),"`, `".join(valid_sensor),))# delete all unsupported control keyssensor={x:yforx,yinsensor.items()ifxinvalid_sensor}# remove size if output size is definedif"output_size"insensor:deloptions["size"]logger.critical("Overriding output frame size with `output_size={}!".format(sensor["output_size"]))else:logger.warning("`sensor` value is of invalid type, Discarding!")sensor={}# handle controls, if specifiedcontrols=options.pop("controls",{})ifisinstance(controls,dict):# extract all valid control keysvalid_controls=self.__camera.camera_controls# remove any fps controls, assigned alreadyvalid_controls.pop("FrameDuration",None)valid_controls.pop("FrameDurationLimits",None)# log all invalid keysinvalid_control_keys=set(list(controls.keys()))-set(list(valid_controls.keys()))invalid_control_keysandlogger.warning("Discarding control properties NOT supported by current Camera Sensor: `{}`. Only supported are: (`{}`)".format("`, `".join(invalid_control_keys),"`, `".join(list(valid_controls.keys())),))# delete all unsupported control keyscontrols={x:yforx,yincontrols.items()ifxinvalid_controls.keys()}else:logger.warning("`controls` value is of invalid type, Discarding!")controls={}# handle buffer_count, if specifiedbuffer_count=options.pop("buffer_count",4)if(notisinstance(buffer_count,int)orbuffer_count<1):# must be greater than 1logger.warning("`buffer_count` value is of invalid type, Discarding!")# `create_preview_configuration` requests 4 sets of buffersbuffer_count=4# handle queue, if specifiedqueue=options.pop("queue",True)ifnotisinstance(queue,bool):logger.warning("`queue` value is of invalid type, Discarding!")queue=True# check if auto-align camera configuration is specifiedauto_align_output_size=options.pop("auto_align_output_size",False)# create default configuration for cameraconfig=self.__camera.create_preview_configuration(main=options,transform=transform,sensor=sensor,controls=controls,buffer_count=buffer_count,queue=queue,)# auto-align camera configuration, if specifiedifisinstance(auto_align_output_size,bool)andauto_align_output_size:self.__loggingandlogger.debug("Re-aligning Output frames to optimal size supported by current Camera Sensor.")self.__camera.align_configuration(config)# configure cameraself.__camera.configure(config)self.__loggingandlogger.debug("Setting Picamera2 API Parameters: `{}`, controls: `{}`, sensor: `{}`, buffer_count: `{}`, and queue: `{}`.".format(self.__camera.camera_configuration()["main"],controls,sensor,buffer_count,queue,))else:# apply attributes to source if specifiedforkey,valueinoptions.items():self.__loggingandlogger.debug("Setting {} API Parameter for Picamera: `{}`".format(key,value))setattr(self.__camera,key,value)exceptExceptionase:# Catch if any error occurredlogger.exception(str(e))# separately handle colorspace value to int conversionifnot(colorspaceisNone):self.color_space=capPropId(colorspace.strip())ifself.__loggingandnot(self.color_spaceisNone):logger.debug("Enabling `{}` colorspace for this video stream!".format(colorspace.strip()))# enable rgb capture array thread and capture streamifnotpicamera2:self.__rawCapture=PiRGBArray(self.__camera,size=resolution)self.stream=self.__camera.capture_continuous(self.__rawCapture,format="bgr",use_video_port=True)# initialize frame variable# with captured frametry:ifpicamera2:# start camera threadself.__camera.start()# capture frame arrayself.frame=self.__camera.capture_array("main")# assign camera as stream for setting# parameters after starting the cameraself.stream=self.__cameraelse:# capture frame array from streamstream=next(self.stream)self.frame=stream.arrayself.__rawCapture.seek(0)self.__rawCapture.truncate()# render colorspace if definedifnot(self.frameisNone)andnot(self.color_spaceisNone):self.frame=cv2.cvtColor(self.frame,self.color_space)exceptExceptionase:logger.exception(str(e))raiseRuntimeError("[PiGear:ERROR] :: Camera Module failed to initialize!")# applying time delay to warm-up picamera only if specifiediftime_delayandisinstance(time_delay,(int,float)):time.sleep(time_delay)# thread initializationself.__thread=None# timer thread initialization(Keeps check on frozen thread)self.__timer=Noneself.__t_elapsed=0.0# records time taken by thread# catching thread exceptionsself.__exceptions=None# initialize termination flagself.__terminate=Falsedefstart(self)->PIGear:""" Launches the internal *Threaded Frames Extractor* daemon **Returns:** A reference to the PiGear class object. """# Start frame producer threadself.__thread=Thread(target=self.__update,name="PiGear",args=())self.__thread.daemon=Trueself.__thread.start()# Start internal timer threadself.__timer=Thread(target=self.__timeit,name="PiTimer",args=())self.__timer.daemon=Trueself.__timer.start()returnselfdef__timeit(self):""" Threaded Internal Timer that keep checks on thread execution timing """# assign current timeself.__t_elapsed=time.time()# loop until terminatedwhilenot(self.__terminate):# check for frozen threadiftime.time()-self.__t_elapsed>self.__failure_timeout:# log failureself.__loggingandlogger.critical("Camera Module Disconnected!")# prepare for clean exitself.__exceptions=Trueself.__terminate=True# self-terminatedef__update(self):""" A **Threaded Frames Extractor**, that keep iterating frames from PiCamera API to a internal monitored deque, until the thread is terminated, or frames runs out. """# keep looping infinitely until the thread is terminatedwhilenot(self.__terminate):ifnotpicamera2:try:# Try to iterate next frame from generatorstream=next(self.stream)exceptException:# catch and save any exceptionsself.__exceptions=sys.exc_info()break# exit# __update timerself.__t_elapsed=time.time()# grab the frame from the streamifpicamera2:frame=self.__camera.capture_array("main")else:frame=stream.array# clear the stream in preparation# for the next frameself.__rawCapture.seek(0)self.__rawCapture.truncate()# apply colorspace if specifiedifnot(self.color_spaceisNone):# apply colorspace to framescolor_frame=Nonetry:color_frame=cv2.cvtColor(frame,self.color_space)exceptExceptionase:# Catch if any error occurredcolor_frame=Noneself.color_space=Noneself.__loggingandlogger.exception(str(e))logger.warning("Assigned colorspace value is invalid. Discarding!")self.frame=color_frameifnot(color_frameisNone)elseframeelse:self.frame=frame# terminate processesifnot(self.__terminate):self.__terminate=True# release resourcesifpicamera2:self.__camera.stop()else:self.__rawCapture.close()self.__camera.close()defread(self)->NDArray:""" Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full. **Returns:** A n-dimensional numpy array. """# check if there are any thread exceptionsifnot(self.__exceptionsisNone):ifisinstance(self.__exceptions,bool):# clear frameself.frame=None# notify user about hardware failureraiseSystemError("[PiGear:ERROR] :: Hardware failure occurred, Kindly reconnect Camera Module and restart your Pi!")else:# clear frameself.frame=None# re-raise error for debuggingerror_msg="[PiGear:ERROR] :: Camera Module API failure occurred: {}".format(self.__exceptions[1])raiseRuntimeError(error_msg).with_traceback(self.__exceptions[2])# return the framereturnself.framedefstop(self)->None:""" Safely terminates the thread, and release the multi-threaded resources. """# log terminationself.__loggingandlogger.debug("Terminating PiGear Processes.")# make sure that the threads should be terminatedself.__terminate=True# stop timer threadifnot(self.__timerisNone):self.__timer.join()self.__timer=None# handle camera threadifnot(self.__threadisNone):# check if hardware failure occurredifnot(self.__exceptionsisNone)andisinstance(self.__exceptions,bool):ifpicamera2:# release picamera2 resourcesself.__camera.stop()else:# force release picamera resourcesself.__rawCapture.close()self.__camera.close()# properly handle thread exit# wait if still process is still# processing some informationself.__thread.join()# remove any threadsself.__thread=None
def__init__(self,camera_num:int=0,resolution:Tuple[int,int]=(640,480),framerate:Union[int,float]=30,colorspace:str=None,logging:bool=False,time_delay:int=0,**options:dict):""" This constructor method initializes the object state and attributes of the PiGear class. Parameters: camera_num (int): selects the camera module index which will be used as source. resolution (tuple): sets the resolution (i.e. `(width,height)`) of the source.. framerate (int/float): sets the framerate of the source. colorspace (str): selects the colorspace of the input stream. logging (bool): enables/disables logging. time_delay (int): time delay (in sec) before start reading the frames. options (dict): provides ability to alter Source Tweak Parameters. """# enable logging if specifiedself.__logging=loggingifisinstance(logging,bool)elseFalse# print current versionlogcurr_vidgear_ver(logging=self.__logging)# raise error(s) for critical Class importsglobalpicamera,picamera2ifpicamera2:# log if picamera2self.__loggingandlogger.info("picamera2 API is currently being accessed.")elifpicamera:# switch to picamera otherwiselogger.critical("picamera2 library not installed on this system. Defaulting to legacy picamera API.")else:# raise error if noneimport_dependency_safe("picamera")assert(isinstance(framerate,(int,float))andframerate>0.0),"[PiGear:ERROR] :: Input framerate value `{}` is a Invalid! Kindly read docs.".format(framerate)assert(isinstance(resolution,(tuple,list))andlen(resolution)==2),"[PiGear:ERROR] :: Input resolution value `{}` is a Invalid! Kindly read docs.".format(resolution)ifnot(isinstance(camera_num,int)andcamera_num>=0):camera_num=0logger.warning("Input camera_num value `{}` is invalid, Defaulting to index 0!")# reformat dictoptions={str(k).strip():vfork,vinoptions.items()}# check if legacy picamera backend is enforcedenforce_legacy_picamera=options.pop("enforce_legacy_picamera",False)ifisinstance(enforce_legacy_picamera,bool)andenforce_legacy_picamera:# check if picamera library is available.ifpicamera:logger.critical("Enforcing legacy picamera API for this run. picamera2 API access will be disabled!")# disable picamera2picamera2=Noneelse:# raise error otherwiselogger.error("`picamera` is unavailable or unsupported on this system, `enforce_legacy_picamera` will be discarded!")import_dependency_safe("picamera")ifpicamera2:# handle loggingnot(self.__logging)andnotos.getenv("LIBCAMERA_LOG_LEVELS",False)andlogger.info("Kindly set `LIBCAMERA_LOG_LEVELS=2` environment variable to disable common libcamera API messages.")# collect metadatacameras_metadata=Picamera2.global_camera_info()# initialize the picamera stream at given indexself.__camera=Picamera2(camera_num=camera_num)# extract metadata for current cameracamera_metadata=[xforxincameras_metadataifx["Num"]==camera_num][0]# check connected camera is USB or I2Cself.__camera_is_usb=Trueif"usb"incamera_metadata["Id"]elseFalse# handle framerate controlifnotself.__camera_is_usb:self.__camera.set_controls({"FrameRate":framerate})else:logger.warning("USB camera detected. Setting input framerate is NOT supported with Picamera2 API!")# logself.__loggingandlogger.debug("Activating Picamera2 API for `{}` camera at index: {} with resolution: {} & framerate: {}".format(camera_metadata["Model"],camera_num,resolutionifnotself.__camera_is_usbelse"default",framerate,))else:# initialize the picamera stream at given indexself.__camera=PiCamera(camera_num=camera_num)self.__camera.resolution=tuple(resolution)self.__camera.framerate=framerateself.__loggingandlogger.debug("Activating Picamera API at index: {} with resolution: {} & framerate: {}".format(camera_num,resolution,framerate))# initialize framerate (Read-only) variableself.framerate=framerate# initializing colorspace variableself.color_space=None# define timeout variable default value(handles hardware failures)self.__failure_timeout=options.pop("HWFAILURE_TIMEOUT",2.0)ifisinstance(self.__failure_timeout,(int,float)):ifnot(10.0>self.__failure_timeout>1.0):raiseValueError("[PiGear:ERROR] :: `HWFAILURE_TIMEOUT` value can only be between 1.0 ~ 10.0")self.__loggingandlogger.debug("Setting HW Failure Timeout: {} seconds".format(self.__failure_timeout))else:# reset improper valuesself.__failure_timeout=2.0try:ifpicamera2:# define common supported picamera2 config parametersvalid_config_options=["auto_align_output_size",# internal"enable_verbose_logs",# internal"format","sensor",]# define non-USB supported picamera2 config parametersnon_usb_options=["controls",# not-supported on USB"transform",# not-working on USB"buffer_count",# not-supported on USB"queue",# not-supported on USB]# Less are supported (will be changed in future)# filter parameter supported with non-USB cameras onlyifself.__camera_is_usb:unsupported_config_keys=set(list(options.keys())).intersection(set(non_usb_options))unsupported_config_keysandlogger.warning("Setting parameters: `{}` for USB camera is NOT supported with Picamera2 API!".format("`, `".join(unsupported_config_keys)))else:valid_config_options+=non_usb_options# log all invalid keysinvalid_config_keys=set(list(options.keys()))-set(valid_config_options)invalid_config_keysandlogger.warning("Discarding invalid options NOT supported by Picamera2 API for current Camera Sensor: `{}`".format("`, `".join(invalid_config_keys)))# delete all unsupported optionsoptions={x:yforx,yinoptions.items()ifxinvalid_config_options}# setting size, already definedoptions.update({"size":tuple(resolution)})# set 24-bit, BGR format by defaultifnot"format"inoptions:# auto defaults for USB camerasnotself.__camera_is_usbandoptions.update({"format":"RGB888"})elifself.__camera_is_usb:# check the supported formats, if USB cameraavail_formats=[mode["format"]formodeinself.__camera.sensor_modes]# handle unsupported formatsifnotoptions["format"]inavail_formats:logger.warning("Discarding `format={}`. `{}` are the only available formats for USB camera in use!".format(options["format"],"`, `".join(avail_formats)))deloptions["format"]else:# `colorspace` parameter must define with `format` optional parameter# unless format is MPEG (tested)(not(colorspaceisNone)oroptions["format"]=="MPEG")andlogger.warning("Custom Output frames `format={}` detected. It is advised to define `colorspace` parameter or handle this format manually in your code!".format(options["format"]))else:# `colorspace` parameter must define with `format` optional parameter# unless format is either BGR or BGRA(not(colorspaceisNone)oroptions["format"]in["RGB888","XRGB8888",])andlogger.warning("Custom Output frames `format={}` detected. It is advised to define `colorspace` parameter or handle this format manually in your code!".format(options["format"]))# enable verbose logging mode (handled by Picamera2 API)verbose=options.pop("enable_verbose_logs",False)ifself.__loggingandisinstance(verbose,bool)andverbose:self.__camera.set_logging(Picamera2.DEBUG)else:# setup loggingself.__camera.set_logging(Picamera2.WARNING)# handle transformations, if specifiedtransform=options.pop("transform",Transform())ifnotisinstance(transform,Transform):logger.warning("`transform` value is of invalid type, Discarding!")transform=Transform()# handle sensor configurations, if specifiedsensor=options.pop("sensor",{})ifisinstance(sensor,dict):# extract all valid sensor keysvalid_sensor=["output_size","bit_depth"]# log all invalid keysinvalid_sensor_keys=set(list(sensor))-set(valid_sensor)invalid_sensor_keysandlogger.warning("Discarding sensor properties NOT supported by current Camera Sensor: `{}`. Only supported are: (`{}`)".format("`, `".join(invalid_sensor_keys),"`, `".join(valid_sensor),))# delete all unsupported control keyssensor={x:yforx,yinsensor.items()ifxinvalid_sensor}# remove size if output size is definedif"output_size"insensor:deloptions["size"]logger.critical("Overriding output frame size with `output_size={}!".format(sensor["output_size"]))else:logger.warning("`sensor` value is of invalid type, Discarding!")sensor={}# handle controls, if specifiedcontrols=options.pop("controls",{})ifisinstance(controls,dict):# extract all valid control keysvalid_controls=self.__camera.camera_controls# remove any fps controls, assigned alreadyvalid_controls.pop("FrameDuration",None)valid_controls.pop("FrameDurationLimits",None)# log all invalid keysinvalid_control_keys=set(list(controls.keys()))-set(list(valid_controls.keys()))invalid_control_keysandlogger.warning("Discarding control properties NOT supported by current Camera Sensor: `{}`. Only supported are: (`{}`)".format("`, `".join(invalid_control_keys),"`, `".join(list(valid_controls.keys())),))# delete all unsupported control keyscontrols={x:yforx,yincontrols.items()ifxinvalid_controls.keys()}else:logger.warning("`controls` value is of invalid type, Discarding!")controls={}# handle buffer_count, if specifiedbuffer_count=options.pop("buffer_count",4)if(notisinstance(buffer_count,int)orbuffer_count<1):# must be greater than 1logger.warning("`buffer_count` value is of invalid type, Discarding!")# `create_preview_configuration` requests 4 sets of buffersbuffer_count=4# handle queue, if specifiedqueue=options.pop("queue",True)ifnotisinstance(queue,bool):logger.warning("`queue` value is of invalid type, Discarding!")queue=True# check if auto-align camera configuration is specifiedauto_align_output_size=options.pop("auto_align_output_size",False)# create default configuration for cameraconfig=self.__camera.create_preview_configuration(main=options,transform=transform,sensor=sensor,controls=controls,buffer_count=buffer_count,queue=queue,)# auto-align camera configuration, if specifiedifisinstance(auto_align_output_size,bool)andauto_align_output_size:self.__loggingandlogger.debug("Re-aligning Output frames to optimal size supported by current Camera Sensor.")self.__camera.align_configuration(config)# configure cameraself.__camera.configure(config)self.__loggingandlogger.debug("Setting Picamera2 API Parameters: `{}`, controls: `{}`, sensor: `{}`, buffer_count: `{}`, and queue: `{}`.".format(self.__camera.camera_configuration()["main"],controls,sensor,buffer_count,queue,))else:# apply attributes to source if specifiedforkey,valueinoptions.items():self.__loggingandlogger.debug("Setting {} API Parameter for Picamera: `{}`".format(key,value))setattr(self.__camera,key,value)exceptExceptionase:# Catch if any error occurredlogger.exception(str(e))# separately handle colorspace value to int conversionifnot(colorspaceisNone):self.color_space=capPropId(colorspace.strip())ifself.__loggingandnot(self.color_spaceisNone):logger.debug("Enabling `{}` colorspace for this video stream!".format(colorspace.strip()))# enable rgb capture array thread and capture streamifnotpicamera2:self.__rawCapture=PiRGBArray(self.__camera,size=resolution)self.stream=self.__camera.capture_continuous(self.__rawCapture,format="bgr",use_video_port=True)# initialize frame variable# with captured frametry:ifpicamera2:# start camera threadself.__camera.start()# capture frame arrayself.frame=self.__camera.capture_array("main")# assign camera as stream for setting# parameters after starting the cameraself.stream=self.__cameraelse:# capture frame array from streamstream=next(self.stream)self.frame=stream.arrayself.__rawCapture.seek(0)self.__rawCapture.truncate()# render colorspace if definedifnot(self.frameisNone)andnot(self.color_spaceisNone):self.frame=cv2.cvtColor(self.frame,self.color_space)exceptExceptionase:logger.exception(str(e))raiseRuntimeError("[PiGear:ERROR] :: Camera Module failed to initialize!")# applying time delay to warm-up picamera only if specifiediftime_delayandisinstance(time_delay,(int,float)):time.sleep(time_delay)# thread initializationself.__thread=None# timer thread initialization(Keeps check on frozen thread)self.__timer=Noneself.__t_elapsed=0.0# records time taken by thread# catching thread exceptionsself.__exceptions=None# initialize termination flagself.__terminate=False
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full.
defread(self)->NDArray:""" Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full. **Returns:** A n-dimensional numpy array. """# check if there are any thread exceptionsifnot(self.__exceptionsisNone):ifisinstance(self.__exceptions,bool):# clear frameself.frame=None# notify user about hardware failureraiseSystemError("[PiGear:ERROR] :: Hardware failure occurred, Kindly reconnect Camera Module and restart your Pi!")else:# clear frameself.frame=None# re-raise error for debuggingerror_msg="[PiGear:ERROR] :: Camera Module API failure occurred: {}".format(self.__exceptions[1])raiseRuntimeError(error_msg).with_traceback(self.__exceptions[2])# return the framereturnself.frame
defstart(self)->PIGear:""" Launches the internal *Threaded Frames Extractor* daemon **Returns:** A reference to the PiGear class object. """# Start frame producer threadself.__thread=Thread(target=self.__update,name="PiGear",args=())self.__thread.daemon=Trueself.__thread.start()# Start internal timer threadself.__timer=Thread(target=self.__timeit,name="PiTimer",args=())self.__timer.daemon=Trueself.__timer.start()returnself
defstop(self)->None:""" Safely terminates the thread, and release the multi-threaded resources. """# log terminationself.__loggingandlogger.debug("Terminating PiGear Processes.")# make sure that the threads should be terminatedself.__terminate=True# stop timer threadifnot(self.__timerisNone):self.__timer.join()self.__timer=None# handle camera threadifnot(self.__threadisNone):# check if hardware failure occurredifnot(self.__exceptionsisNone)andisinstance(self.__exceptions,bool):ifpicamera2:# release picamera2 resourcesself.__camera.stop()else:# force release picamera resourcesself.__rawCapture.close()self.__camera.close()# properly handle thread exit# wait if still process is still# processing some informationself.__thread.join()# remove any threadsself.__thread=None