Skip to content

PiGear API References

PiGear API usage examples can be found here ➶

PiGear API parameters are explained here ➶

PiGear implements a seamless and robust wrapper around the picamera2 python library, simplifying integration with minimal code changes and ensuring a smooth transition for developers already familiar with the Picamera2 API. PiGear leverages the libcamera API under the hood with multi-threading, providing high-performance 🔥, enhanced control and functionality for Raspberry Pi camera modules.

PiGear handles common configuration parameters and non-standard settings for various camera types, simplifying the integration process. PiGear currently supports picamera2 API parameters such as sensor, controls, transform, and stride, with internal type and sanity checks for robust performance.

While primarily focused on Raspberry Pi camera modules, PiGear also provides basic functionality for USB webcams only with Picamera2 API, along with the ability to accurately differentiate between USB and Raspberry Pi cameras using metadata.

Backward compatibility with picamera library

PiGear seamlessly switches to the legacy picamera library if the picamera2 library is unavailable, ensuring seamless backward compatibility. For this, PiGear also provides a flexible multi-threaded framework around complete picamera API, allowing developers to effortlessly exploit a wide range of parameters, such as brightness, saturation, sensor_mode, iso, exposure, and more.

Furthermore, PiGear supports the use of multiple camera modules, including those found on Raspberry Pi Compute Module IO boards and USB cameras (only with Picamera2 API).

Threaded Internal Timer

PiGear ensures proper resource release during the termination of the API, preventing potential issues or resource leaks. PiGear API internally implements a Threaded Internal Timer that silently keeps active track of any frozen-threads or hardware-failures and exits safely if any do occur. This means that if you're running the PiGear API in your script and someone accidentally pulls the Camera-Module cable out, instead of going into a possible kernel panic, the API will exit safely to save resources.

Make sure to enable Raspberry Pi hardware-specific settings prior using this API, otherwise nothing will work.

Source code in vidgear/gears/pigear.py
class PiGear:
    """
    PiGear implements a seamless and robust wrapper around the [picamera2](https://github.com/raspberrypi/picamera2) python library, simplifying integration with minimal code changes and ensuring a
    smooth transition for developers already familiar with the Picamera2 API. PiGear leverages the `libcamera` API under the hood with multi-threading, providing high-performance :fire:, enhanced
    control and functionality for Raspberry Pi camera modules.

    PiGear handles common configuration parameters and non-standard settings for various camera types, simplifying the integration process. PiGear currently supports picamera2 API parameters such as
    `sensor`, `controls`, `transform`, and `stride`, with internal type and sanity checks for robust performance.

    While primarily focused on Raspberry Pi camera modules, PiGear also provides basic functionality for USB webcams only with Picamera2 API, along with the ability to accurately differentiate between
    USB and Raspberry Pi cameras using metadata.

    ???+ info "Backward compatibility with `picamera` library"
        PiGear seamlessly switches to the legacy [picamera](https://picamera.readthedocs.io/en/release-1.13/index.html) library if the `picamera2` library is unavailable, ensuring seamless backward
        compatibility. For this, PiGear also provides a flexible multi-threaded framework around complete `picamera` API, allowing developers to effortlessly exploit a wide range of parameters, such
        as `brightness`, `saturation`, `sensor_mode`, `iso`, `exposure`, and more.

    Furthermore, PiGear supports the use of multiple camera modules, including those found on Raspberry Pi Compute Module IO boards and USB cameras _(only with Picamera2 API)_.

    ??? new "Threaded Internal Timer :material-camera-timer:"
        PiGear ensures proper resource release during the termination of the API, preventing potential issues or resource leaks. PiGear API internally implements a
        ==Threaded Internal Timer== that silently keeps active track of any frozen-threads or hardware-failures and exits safely if any do occur. This means that if
        you're running the PiGear API in your script and someone accidentally pulls the Camera-Module cable out, instead of going into a possible kernel panic,
        the API will exit safely to save resources.

    !!! failure "Make sure to [enable Raspberry Pi hardware-specific settings](https://picamera.readthedocs.io/en/release-1.13/quickstart.html) prior using this API, otherwise nothing will work."
    """

    def __init__(
        self,
        camera_num: int = 0,
        resolution: Tuple[int, int] = (640, 480),
        framerate: Union[int, float] = 30,
        colorspace: str = None,
        logging: bool = False,
        time_delay: int = 0,
        **options: dict
    ):
        """
        This constructor method initializes the object state and attributes of the PiGear class.

        Parameters:
            camera_num (int): selects the camera module index which will be used as source.
            resolution (tuple): sets the resolution (i.e. `(width,height)`) of the source..
            framerate (int/float): sets the framerate of the source.
            colorspace (str): selects the colorspace of the input stream.
            logging (bool): enables/disables logging.
            time_delay (int): time delay (in sec) before start reading the frames.
            options (dict): provides ability to alter Source Tweak Parameters.
        """
        # enable logging if specified
        self.__logging = logging if isinstance(logging, bool) else False

        # print current version
        logcurr_vidgear_ver(logging=self.__logging)

        # raise error(s) for critical Class imports
        global picamera, picamera2
        if picamera2:
            # log if picamera2
            self.__logging and logger.info("picamera2 API is currently being accessed.")
        elif picamera:
            # switch to picamera otherwise
            logger.critical(
                "picamera2 library not installed on this system. Defaulting to legacy picamera API."
            )
        else:
            # raise error if none
            import_dependency_safe("picamera")

        assert (
            isinstance(framerate, (int, float)) and framerate > 0.0
        ), "[PiGear:ERROR] :: Input framerate value `{}` is a Invalid! Kindly read docs.".format(
            framerate
        )
        assert (
            isinstance(resolution, (tuple, list)) and len(resolution) == 2
        ), "[PiGear:ERROR] :: Input resolution value `{}` is a Invalid! Kindly read docs.".format(
            resolution
        )
        if not (isinstance(camera_num, int) and camera_num >= 0):
            camera_num = 0
            logger.warning(
                "Input camera_num value `{}` is invalid, Defaulting to index 0!"
            )

        # reformat dict
        options = {str(k).strip(): v for k, v in options.items()}

        # check if legacy picamera backend is enforced
        enforce_legacy_picamera = options.pop("enforce_legacy_picamera", False)
        if isinstance(enforce_legacy_picamera, bool) and enforce_legacy_picamera:
            # check if picamera library is available.
            if picamera:
                logger.critical(
                    "Enforcing legacy picamera API for this run. picamera2 API access will be disabled!"
                )
                # disable picamera2
                picamera2 = None
            else:
                # raise error otherwise
                logger.error(
                    "`picamera` is unavailable or unsupported on this system, `enforce_legacy_picamera` will be discarded!"
                )
                import_dependency_safe("picamera")

        if picamera2:
            # handle logging
            not (self.__logging) and not os.getenv(
                "LIBCAMERA_LOG_LEVELS", False
            ) and logger.info(
                "Kindly set `LIBCAMERA_LOG_LEVELS=2` environment variable to disable common libcamera API messages."
            )
            # collect metadata
            cameras_metadata = Picamera2.global_camera_info()
            # initialize the picamera stream at given index
            self.__camera = Picamera2(camera_num=camera_num)
            # extract metadata for current camera
            camera_metadata = [x for x in cameras_metadata if x["Num"] == camera_num][0]
            # check connected camera is USB or I2C
            self.__camera_is_usb = True if "usb" in camera_metadata["Id"] else False
            # handle framerate control
            if not self.__camera_is_usb:
                self.__camera.set_controls({"FrameRate": framerate})
            else:
                logger.warning(
                    "USB camera detected. Setting input framerate is NOT supported with Picamera2 API!"
                )
            # log
            self.__logging and logger.debug(
                "Activating Picamera2 API for `{}` camera at index: {} with resolution: {} & framerate: {}".format(
                    camera_metadata["Model"],
                    camera_num,
                    resolution if not self.__camera_is_usb else "default",
                    framerate,
                )
            )
        else:
            # initialize the picamera stream at given index
            self.__camera = PiCamera(camera_num=camera_num)
            self.__camera.resolution = tuple(resolution)
            self.__camera.framerate = framerate
            self.__logging and logger.debug(
                "Activating Picamera API at index: {} with resolution: {} & framerate: {}".format(
                    camera_num, resolution, framerate
                )
            )

        # initialize framerate (Read-only) variable
        self.framerate = framerate

        # initializing colorspace variable
        self.color_space = None

        # define timeout variable default value(handles hardware failures)
        self.__failure_timeout = options.pop("HWFAILURE_TIMEOUT", 2.0)
        if isinstance(self.__failure_timeout, (int, float)):
            if not (10.0 > self.__failure_timeout > 1.0):
                raise ValueError(
                    "[PiGear:ERROR] :: `HWFAILURE_TIMEOUT` value can only be between 1.0 ~ 10.0"
                )
            self.__logging and logger.debug(
                "Setting HW Failure Timeout: {} seconds".format(self.__failure_timeout)
            )
        else:
            # reset improper values
            self.__failure_timeout = 2.0

        try:
            if picamera2:
                # define common supported picamera2 config parameters
                valid_config_options = [
                    "auto_align_output_size",  # internal
                    "enable_verbose_logs",  # internal
                    "format",
                    "sensor",
                ]

                # define non-USB supported picamera2 config parameters
                non_usb_options = [
                    "controls",  # not-supported on USB
                    "transform",  # not-working on USB
                    "buffer_count",  # not-supported on USB
                    "queue",  # not-supported on USB
                ]  # Less are supported (will be changed in future)

                # filter parameter supported with non-USB cameras only
                if self.__camera_is_usb:
                    unsupported_config_keys = set(list(options.keys())).intersection(
                        set(non_usb_options)
                    )
                    unsupported_config_keys and logger.warning(
                        "Setting parameters: `{}` for USB camera is NOT supported with Picamera2 API!".format(
                            "`, `".join(unsupported_config_keys)
                        )
                    )
                else:
                    valid_config_options += non_usb_options

                # log all invalid keys
                invalid_config_keys = set(list(options.keys())) - set(
                    valid_config_options
                )
                invalid_config_keys and logger.warning(
                    "Discarding invalid options NOT supported by Picamera2 API for current Camera Sensor: `{}`".format(
                        "`, `".join(invalid_config_keys)
                    )
                )
                # delete all unsupported options
                options = {
                    x: y for x, y in options.items() if x in valid_config_options
                }

                # setting size, already defined
                options.update({"size": tuple(resolution)})

                # set 24-bit, BGR format by default
                if not "format" in options:
                    # auto defaults for USB cameras
                    not self.__camera_is_usb and options.update({"format": "RGB888"})
                elif self.__camera_is_usb:
                    # check the supported formats, if USB camera
                    avail_formats = [
                        mode["format"] for mode in self.__camera.sensor_modes
                    ]
                    # handle unsupported formats
                    if not options["format"] in avail_formats:
                        logger.warning(
                            "Discarding `format={}`. `{}` are the only available formats for USB camera in use!".format(
                                options["format"], "`, `".join(avail_formats)
                            )
                        )
                        del options["format"]
                    else:
                        # `colorspace` parameter must define with  `format` optional parameter
                        # unless format is MPEG (tested)
                        (
                            not (colorspace is None) or options["format"] == "MPEG"
                        ) and logger.warning(
                            "Custom Output frames `format={}` detected. It is advised to define `colorspace` parameter or handle this format manually in your code!".format(
                                options["format"]
                            )
                        )
                else:
                    # `colorspace` parameter must define with  `format` optional parameter
                    # unless format is either BGR or BGRA
                    (
                        not (colorspace is None)
                        or options["format"] in ["RGB888", "XRGB8888",]
                    ) and logger.warning(
                        "Custom Output frames `format={}` detected. It is advised to define `colorspace` parameter or handle this format manually in your code!".format(
                            options["format"]
                        )
                    )

                # enable verbose logging mode (handled by Picamera2 API)
                verbose = options.pop("enable_verbose_logs", False)
                if self.__logging and isinstance(verbose, bool) and verbose:
                    self.__camera.set_logging(Picamera2.DEBUG)
                else:
                    # setup logging
                    self.__camera.set_logging(Picamera2.WARNING)

                # handle transformations, if specified
                transform = options.pop("transform", Transform())
                if not isinstance(transform, Transform):
                    logger.warning("`transform` value is of invalid type, Discarding!")
                    transform = Transform()

                # handle sensor configurations, if specified
                sensor = options.pop("sensor", {})
                if isinstance(sensor, dict):
                    # extract all valid sensor keys
                    valid_sensor = ["output_size", "bit_depth"]
                    # log all invalid keys
                    invalid_sensor_keys = set(list(sensor)) - set(valid_sensor)
                    invalid_sensor_keys and logger.warning(
                        "Discarding sensor properties NOT supported by current Camera Sensor: `{}`. Only supported are: (`{}`)".format(
                            "`, `".join(invalid_sensor_keys), "`, `".join(valid_sensor),
                        )
                    )
                    # delete all unsupported control keys
                    sensor = {x: y for x, y in sensor.items() if x in valid_sensor}
                    # remove size if output size is defined
                    if "output_size" in sensor:
                        del options["size"]
                        logger.critical(
                            "Overriding output frame size with `output_size={}!".format(
                                sensor["output_size"]
                            )
                        )
                else:
                    logger.warning("`sensor` value is of invalid type, Discarding!")
                    sensor = {}

                # handle controls, if specified
                controls = options.pop("controls", {})
                if isinstance(controls, dict):
                    # extract all valid control keys
                    valid_controls = self.__camera.camera_controls
                    # remove any fps controls, assigned already
                    valid_controls.pop("FrameDuration", None)
                    valid_controls.pop("FrameDurationLimits", None)
                    # log all invalid keys
                    invalid_control_keys = set(list(controls.keys())) - set(
                        list(valid_controls.keys())
                    )
                    invalid_control_keys and logger.warning(
                        "Discarding control properties NOT supported by current Camera Sensor: `{}`. Only supported are: (`{}`)".format(
                            "`, `".join(invalid_control_keys),
                            "`, `".join(list(valid_controls.keys())),
                        )
                    )
                    # delete all unsupported control keys
                    controls = {
                        x: y for x, y in controls.items() if x in valid_controls.keys()
                    }
                else:
                    logger.warning("`controls` value is of invalid type, Discarding!")
                    controls = {}

                # handle buffer_count, if specified
                buffer_count = options.pop("buffer_count", 4)
                if (
                    not isinstance(buffer_count, int) or buffer_count < 1
                ):  # must be greater than 1
                    logger.warning(
                        "`buffer_count` value is of invalid type, Discarding!"
                    )
                    # `create_preview_configuration` requests 4 sets of buffers
                    buffer_count = 4

                # handle queue, if specified
                queue = options.pop("queue", True)
                if not isinstance(queue, bool):
                    logger.warning("`queue` value is of invalid type, Discarding!")
                    queue = True

                # check if auto-align camera configuration is specified
                auto_align_output_size = options.pop("auto_align_output_size", False)

                # create default configuration for camera
                config = self.__camera.create_preview_configuration(
                    main=options,
                    transform=transform,
                    sensor=sensor,
                    controls=controls,
                    buffer_count=buffer_count,
                    queue=queue,
                )

                # auto-align camera configuration, if specified
                if isinstance(auto_align_output_size, bool) and auto_align_output_size:
                    self.__logging and logger.debug(
                        "Re-aligning Output frames to optimal size supported by current Camera Sensor."
                    )
                    self.__camera.align_configuration(config)

                # configure camera
                self.__camera.configure(config)
                self.__logging and logger.debug(
                    "Setting Picamera2 API Parameters: `{}`, controls: `{}`, sensor: `{}`, buffer_count: `{}`, and queue: `{}`.".format(
                        self.__camera.camera_configuration()["main"],
                        controls,
                        sensor,
                        buffer_count,
                        queue,
                    )
                )
            else:
                # apply attributes to source if specified
                for key, value in options.items():
                    self.__logging and logger.debug(
                        "Setting {} API Parameter for Picamera: `{}`".format(key, value)
                    )
                    setattr(self.__camera, key, value)
        except Exception as e:
            # Catch if any error occurred
            logger.exception(str(e))

        # separately handle colorspace value to int conversion
        if not (colorspace is None):
            self.color_space = capPropId(colorspace.strip())
            if self.__logging and not (self.color_space is None):
                logger.debug(
                    "Enabling `{}` colorspace for this video stream!".format(
                        colorspace.strip()
                    )
                )

        # enable rgb capture array thread and capture stream
        if not picamera2:
            self.__rawCapture = PiRGBArray(self.__camera, size=resolution)
            self.stream = self.__camera.capture_continuous(
                self.__rawCapture, format="bgr", use_video_port=True
            )

        # initialize frame variable
        # with captured frame
        try:
            if picamera2:
                # start camera thread
                self.__camera.start()
                # capture frame array
                self.frame = self.__camera.capture_array("main")
                # assign camera as stream for setting
                # parameters after starting the camera
                self.stream = self.__camera
            else:
                # capture frame array from stream
                stream = next(self.stream)
                self.frame = stream.array
                self.__rawCapture.seek(0)
                self.__rawCapture.truncate()
            # render colorspace if defined
            if not (self.frame is None) and not (self.color_space is None):
                self.frame = cv2.cvtColor(self.frame, self.color_space)
        except Exception as e:
            logger.exception(str(e))
            raise RuntimeError("[PiGear:ERROR] :: Camera Module failed to initialize!")

        # applying time delay to warm-up picamera only if specified
        if time_delay and isinstance(time_delay, (int, float)):
            time.sleep(time_delay)

        # thread initialization
        self.__thread = None

        # timer thread initialization(Keeps check on frozen thread)
        self.__timer = None
        self.__t_elapsed = 0.0  # records time taken by thread

        # catching thread exceptions
        self.__exceptions = None

        # initialize termination flag
        self.__terminate = False

    def start(self) -> PIGear:
        """
        Launches the internal *Threaded Frames Extractor* daemon

        **Returns:** A reference to the PiGear class object.
        """
        # Start frame producer thread
        self.__thread = Thread(target=self.__update, name="PiGear", args=())
        self.__thread.daemon = True
        self.__thread.start()

        # Start internal timer thread
        self.__timer = Thread(target=self.__timeit, name="PiTimer", args=())
        self.__timer.daemon = True
        self.__timer.start()

        return self

    def __timeit(self):
        """
        Threaded Internal Timer that keep checks on thread execution timing
        """
        # assign current time
        self.__t_elapsed = time.time()

        # loop until terminated
        while not (self.__terminate):
            # check for frozen thread
            if time.time() - self.__t_elapsed > self.__failure_timeout:
                # log failure
                self.__logging and logger.critical("Camera Module Disconnected!")
                # prepare for clean exit
                self.__exceptions = True
                self.__terminate = True  # self-terminate

    def __update(self):
        """
        A **Threaded Frames Extractor**, that keep iterating frames from PiCamera API to a internal monitored deque,
        until the thread is terminated, or frames runs out.
        """
        # keep looping infinitely until the thread is terminated
        while not (self.__terminate):
            if not picamera2:
                try:
                    # Try to iterate next frame from generator
                    stream = next(self.stream)
                except Exception:
                    # catch and save any exceptions
                    self.__exceptions = sys.exc_info()
                    break  # exit

            # __update timer
            self.__t_elapsed = time.time()

            # grab the frame from the stream
            if picamera2:
                frame = self.__camera.capture_array("main")
            else:
                frame = stream.array
                # clear the stream in preparation
                # for the next frame
                self.__rawCapture.seek(0)
                self.__rawCapture.truncate()

            # apply colorspace if specified
            if not (self.color_space is None):
                # apply colorspace to frames
                color_frame = None
                try:
                    color_frame = cv2.cvtColor(frame, self.color_space)
                except Exception as e:
                    # Catch if any error occurred
                    color_frame = None
                    self.color_space = None
                    self.__logging and logger.exception(str(e))
                    logger.warning("Assigned colorspace value is invalid. Discarding!")
                self.frame = color_frame if not (color_frame is None) else frame
            else:
                self.frame = frame

        # terminate processes
        if not (self.__terminate):
            self.__terminate = True

        # release resources
        if picamera2:
            self.__camera.stop()
        else:
            self.__rawCapture.close()
            self.__camera.close()

    def read(self) -> NDArray:
        """
        Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
        and blocks the thread if the deque is full.

        **Returns:** A n-dimensional numpy array.
        """
        # check if there are any thread exceptions
        if not (self.__exceptions is None):
            if isinstance(self.__exceptions, bool):
                # clear frame
                self.frame = None
                # notify user about hardware failure
                raise SystemError(
                    "[PiGear:ERROR] :: Hardware failure occurred, Kindly reconnect Camera Module and restart your Pi!"
                )
            else:
                # clear frame
                self.frame = None
                # re-raise error for debugging
                error_msg = "[PiGear:ERROR] :: Camera Module API failure occurred: {}".format(
                    self.__exceptions[1]
                )
                raise RuntimeError(error_msg).with_traceback(self.__exceptions[2])
        # return the frame
        return self.frame

    def stop(self) -> None:
        """
        Safely terminates the thread, and release the multi-threaded resources.
        """
        # log termination
        self.__logging and logger.debug("Terminating PiGear Processes.")

        # make sure that the threads should be terminated
        self.__terminate = True

        # stop timer thread
        if not (self.__timer is None):
            self.__timer.join()
            self.__timer = None

        # handle camera thread
        if not (self.__thread is None):
            # check if hardware failure occurred
            if not (self.__exceptions is None) and isinstance(self.__exceptions, bool):
                if picamera2:
                    # release picamera2 resources
                    self.__camera.stop()
                else:
                    # force release picamera resources
                    self.__rawCapture.close()
                    self.__camera.close()
            # properly handle thread exit
            # wait if still process is still
            # processing some information
            self.__thread.join()
            # remove any threads
            self.__thread = None

__init__(self, camera_num=0, resolution=(640, 480), framerate=30, colorspace=None, logging=False, time_delay=0, **options) special

This constructor method initializes the object state and attributes of the PiGear class.

Parameters:

Name Type Description Default
camera_num int

selects the camera module index which will be used as source.

0
resolution tuple

sets the resolution (i.e. (width,height)) of the source..

(640, 480)
framerate int/float

sets the framerate of the source.

30
colorspace str

selects the colorspace of the input stream.

None
logging bool

enables/disables logging.

False
time_delay int

time delay (in sec) before start reading the frames.

0
options dict

provides ability to alter Source Tweak Parameters.

{}
Source code in vidgear/gears/pigear.py
def __init__(
    self,
    camera_num: int = 0,
    resolution: Tuple[int, int] = (640, 480),
    framerate: Union[int, float] = 30,
    colorspace: str = None,
    logging: bool = False,
    time_delay: int = 0,
    **options: dict
):
    """
    This constructor method initializes the object state and attributes of the PiGear class.

    Parameters:
        camera_num (int): selects the camera module index which will be used as source.
        resolution (tuple): sets the resolution (i.e. `(width,height)`) of the source..
        framerate (int/float): sets the framerate of the source.
        colorspace (str): selects the colorspace of the input stream.
        logging (bool): enables/disables logging.
        time_delay (int): time delay (in sec) before start reading the frames.
        options (dict): provides ability to alter Source Tweak Parameters.
    """
    # enable logging if specified
    self.__logging = logging if isinstance(logging, bool) else False

    # print current version
    logcurr_vidgear_ver(logging=self.__logging)

    # raise error(s) for critical Class imports
    global picamera, picamera2
    if picamera2:
        # log if picamera2
        self.__logging and logger.info("picamera2 API is currently being accessed.")
    elif picamera:
        # switch to picamera otherwise
        logger.critical(
            "picamera2 library not installed on this system. Defaulting to legacy picamera API."
        )
    else:
        # raise error if none
        import_dependency_safe("picamera")

    assert (
        isinstance(framerate, (int, float)) and framerate > 0.0
    ), "[PiGear:ERROR] :: Input framerate value `{}` is a Invalid! Kindly read docs.".format(
        framerate
    )
    assert (
        isinstance(resolution, (tuple, list)) and len(resolution) == 2
    ), "[PiGear:ERROR] :: Input resolution value `{}` is a Invalid! Kindly read docs.".format(
        resolution
    )
    if not (isinstance(camera_num, int) and camera_num >= 0):
        camera_num = 0
        logger.warning(
            "Input camera_num value `{}` is invalid, Defaulting to index 0!"
        )

    # reformat dict
    options = {str(k).strip(): v for k, v in options.items()}

    # check if legacy picamera backend is enforced
    enforce_legacy_picamera = options.pop("enforce_legacy_picamera", False)
    if isinstance(enforce_legacy_picamera, bool) and enforce_legacy_picamera:
        # check if picamera library is available.
        if picamera:
            logger.critical(
                "Enforcing legacy picamera API for this run. picamera2 API access will be disabled!"
            )
            # disable picamera2
            picamera2 = None
        else:
            # raise error otherwise
            logger.error(
                "`picamera` is unavailable or unsupported on this system, `enforce_legacy_picamera` will be discarded!"
            )
            import_dependency_safe("picamera")

    if picamera2:
        # handle logging
        not (self.__logging) and not os.getenv(
            "LIBCAMERA_LOG_LEVELS", False
        ) and logger.info(
            "Kindly set `LIBCAMERA_LOG_LEVELS=2` environment variable to disable common libcamera API messages."
        )
        # collect metadata
        cameras_metadata = Picamera2.global_camera_info()
        # initialize the picamera stream at given index
        self.__camera = Picamera2(camera_num=camera_num)
        # extract metadata for current camera
        camera_metadata = [x for x in cameras_metadata if x["Num"] == camera_num][0]
        # check connected camera is USB or I2C
        self.__camera_is_usb = True if "usb" in camera_metadata["Id"] else False
        # handle framerate control
        if not self.__camera_is_usb:
            self.__camera.set_controls({"FrameRate": framerate})
        else:
            logger.warning(
                "USB camera detected. Setting input framerate is NOT supported with Picamera2 API!"
            )
        # log
        self.__logging and logger.debug(
            "Activating Picamera2 API for `{}` camera at index: {} with resolution: {} & framerate: {}".format(
                camera_metadata["Model"],
                camera_num,
                resolution if not self.__camera_is_usb else "default",
                framerate,
            )
        )
    else:
        # initialize the picamera stream at given index
        self.__camera = PiCamera(camera_num=camera_num)
        self.__camera.resolution = tuple(resolution)
        self.__camera.framerate = framerate
        self.__logging and logger.debug(
            "Activating Picamera API at index: {} with resolution: {} & framerate: {}".format(
                camera_num, resolution, framerate
            )
        )

    # initialize framerate (Read-only) variable
    self.framerate = framerate

    # initializing colorspace variable
    self.color_space = None

    # define timeout variable default value(handles hardware failures)
    self.__failure_timeout = options.pop("HWFAILURE_TIMEOUT", 2.0)
    if isinstance(self.__failure_timeout, (int, float)):
        if not (10.0 > self.__failure_timeout > 1.0):
            raise ValueError(
                "[PiGear:ERROR] :: `HWFAILURE_TIMEOUT` value can only be between 1.0 ~ 10.0"
            )
        self.__logging and logger.debug(
            "Setting HW Failure Timeout: {} seconds".format(self.__failure_timeout)
        )
    else:
        # reset improper values
        self.__failure_timeout = 2.0

    try:
        if picamera2:
            # define common supported picamera2 config parameters
            valid_config_options = [
                "auto_align_output_size",  # internal
                "enable_verbose_logs",  # internal
                "format",
                "sensor",
            ]

            # define non-USB supported picamera2 config parameters
            non_usb_options = [
                "controls",  # not-supported on USB
                "transform",  # not-working on USB
                "buffer_count",  # not-supported on USB
                "queue",  # not-supported on USB
            ]  # Less are supported (will be changed in future)

            # filter parameter supported with non-USB cameras only
            if self.__camera_is_usb:
                unsupported_config_keys = set(list(options.keys())).intersection(
                    set(non_usb_options)
                )
                unsupported_config_keys and logger.warning(
                    "Setting parameters: `{}` for USB camera is NOT supported with Picamera2 API!".format(
                        "`, `".join(unsupported_config_keys)
                    )
                )
            else:
                valid_config_options += non_usb_options

            # log all invalid keys
            invalid_config_keys = set(list(options.keys())) - set(
                valid_config_options
            )
            invalid_config_keys and logger.warning(
                "Discarding invalid options NOT supported by Picamera2 API for current Camera Sensor: `{}`".format(
                    "`, `".join(invalid_config_keys)
                )
            )
            # delete all unsupported options
            options = {
                x: y for x, y in options.items() if x in valid_config_options
            }

            # setting size, already defined
            options.update({"size": tuple(resolution)})

            # set 24-bit, BGR format by default
            if not "format" in options:
                # auto defaults for USB cameras
                not self.__camera_is_usb and options.update({"format": "RGB888"})
            elif self.__camera_is_usb:
                # check the supported formats, if USB camera
                avail_formats = [
                    mode["format"] for mode in self.__camera.sensor_modes
                ]
                # handle unsupported formats
                if not options["format"] in avail_formats:
                    logger.warning(
                        "Discarding `format={}`. `{}` are the only available formats for USB camera in use!".format(
                            options["format"], "`, `".join(avail_formats)
                        )
                    )
                    del options["format"]
                else:
                    # `colorspace` parameter must define with  `format` optional parameter
                    # unless format is MPEG (tested)
                    (
                        not (colorspace is None) or options["format"] == "MPEG"
                    ) and logger.warning(
                        "Custom Output frames `format={}` detected. It is advised to define `colorspace` parameter or handle this format manually in your code!".format(
                            options["format"]
                        )
                    )
            else:
                # `colorspace` parameter must define with  `format` optional parameter
                # unless format is either BGR or BGRA
                (
                    not (colorspace is None)
                    or options["format"] in ["RGB888", "XRGB8888",]
                ) and logger.warning(
                    "Custom Output frames `format={}` detected. It is advised to define `colorspace` parameter or handle this format manually in your code!".format(
                        options["format"]
                    )
                )

            # enable verbose logging mode (handled by Picamera2 API)
            verbose = options.pop("enable_verbose_logs", False)
            if self.__logging and isinstance(verbose, bool) and verbose:
                self.__camera.set_logging(Picamera2.DEBUG)
            else:
                # setup logging
                self.__camera.set_logging(Picamera2.WARNING)

            # handle transformations, if specified
            transform = options.pop("transform", Transform())
            if not isinstance(transform, Transform):
                logger.warning("`transform` value is of invalid type, Discarding!")
                transform = Transform()

            # handle sensor configurations, if specified
            sensor = options.pop("sensor", {})
            if isinstance(sensor, dict):
                # extract all valid sensor keys
                valid_sensor = ["output_size", "bit_depth"]
                # log all invalid keys
                invalid_sensor_keys = set(list(sensor)) - set(valid_sensor)
                invalid_sensor_keys and logger.warning(
                    "Discarding sensor properties NOT supported by current Camera Sensor: `{}`. Only supported are: (`{}`)".format(
                        "`, `".join(invalid_sensor_keys), "`, `".join(valid_sensor),
                    )
                )
                # delete all unsupported control keys
                sensor = {x: y for x, y in sensor.items() if x in valid_sensor}
                # remove size if output size is defined
                if "output_size" in sensor:
                    del options["size"]
                    logger.critical(
                        "Overriding output frame size with `output_size={}!".format(
                            sensor["output_size"]
                        )
                    )
            else:
                logger.warning("`sensor` value is of invalid type, Discarding!")
                sensor = {}

            # handle controls, if specified
            controls = options.pop("controls", {})
            if isinstance(controls, dict):
                # extract all valid control keys
                valid_controls = self.__camera.camera_controls
                # remove any fps controls, assigned already
                valid_controls.pop("FrameDuration", None)
                valid_controls.pop("FrameDurationLimits", None)
                # log all invalid keys
                invalid_control_keys = set(list(controls.keys())) - set(
                    list(valid_controls.keys())
                )
                invalid_control_keys and logger.warning(
                    "Discarding control properties NOT supported by current Camera Sensor: `{}`. Only supported are: (`{}`)".format(
                        "`, `".join(invalid_control_keys),
                        "`, `".join(list(valid_controls.keys())),
                    )
                )
                # delete all unsupported control keys
                controls = {
                    x: y for x, y in controls.items() if x in valid_controls.keys()
                }
            else:
                logger.warning("`controls` value is of invalid type, Discarding!")
                controls = {}

            # handle buffer_count, if specified
            buffer_count = options.pop("buffer_count", 4)
            if (
                not isinstance(buffer_count, int) or buffer_count < 1
            ):  # must be greater than 1
                logger.warning(
                    "`buffer_count` value is of invalid type, Discarding!"
                )
                # `create_preview_configuration` requests 4 sets of buffers
                buffer_count = 4

            # handle queue, if specified
            queue = options.pop("queue", True)
            if not isinstance(queue, bool):
                logger.warning("`queue` value is of invalid type, Discarding!")
                queue = True

            # check if auto-align camera configuration is specified
            auto_align_output_size = options.pop("auto_align_output_size", False)

            # create default configuration for camera
            config = self.__camera.create_preview_configuration(
                main=options,
                transform=transform,
                sensor=sensor,
                controls=controls,
                buffer_count=buffer_count,
                queue=queue,
            )

            # auto-align camera configuration, if specified
            if isinstance(auto_align_output_size, bool) and auto_align_output_size:
                self.__logging and logger.debug(
                    "Re-aligning Output frames to optimal size supported by current Camera Sensor."
                )
                self.__camera.align_configuration(config)

            # configure camera
            self.__camera.configure(config)
            self.__logging and logger.debug(
                "Setting Picamera2 API Parameters: `{}`, controls: `{}`, sensor: `{}`, buffer_count: `{}`, and queue: `{}`.".format(
                    self.__camera.camera_configuration()["main"],
                    controls,
                    sensor,
                    buffer_count,
                    queue,
                )
            )
        else:
            # apply attributes to source if specified
            for key, value in options.items():
                self.__logging and logger.debug(
                    "Setting {} API Parameter for Picamera: `{}`".format(key, value)
                )
                setattr(self.__camera, key, value)
    except Exception as e:
        # Catch if any error occurred
        logger.exception(str(e))

    # separately handle colorspace value to int conversion
    if not (colorspace is None):
        self.color_space = capPropId(colorspace.strip())
        if self.__logging and not (self.color_space is None):
            logger.debug(
                "Enabling `{}` colorspace for this video stream!".format(
                    colorspace.strip()
                )
            )

    # enable rgb capture array thread and capture stream
    if not picamera2:
        self.__rawCapture = PiRGBArray(self.__camera, size=resolution)
        self.stream = self.__camera.capture_continuous(
            self.__rawCapture, format="bgr", use_video_port=True
        )

    # initialize frame variable
    # with captured frame
    try:
        if picamera2:
            # start camera thread
            self.__camera.start()
            # capture frame array
            self.frame = self.__camera.capture_array("main")
            # assign camera as stream for setting
            # parameters after starting the camera
            self.stream = self.__camera
        else:
            # capture frame array from stream
            stream = next(self.stream)
            self.frame = stream.array
            self.__rawCapture.seek(0)
            self.__rawCapture.truncate()
        # render colorspace if defined
        if not (self.frame is None) and not (self.color_space is None):
            self.frame = cv2.cvtColor(self.frame, self.color_space)
    except Exception as e:
        logger.exception(str(e))
        raise RuntimeError("[PiGear:ERROR] :: Camera Module failed to initialize!")

    # applying time delay to warm-up picamera only if specified
    if time_delay and isinstance(time_delay, (int, float)):
        time.sleep(time_delay)

    # thread initialization
    self.__thread = None

    # timer thread initialization(Keeps check on frozen thread)
    self.__timer = None
    self.__t_elapsed = 0.0  # records time taken by thread

    # catching thread exceptions
    self.__exceptions = None

    # initialize termination flag
    self.__terminate = False

read(self)

Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full.

Returns: A n-dimensional numpy array.

Source code in vidgear/gears/pigear.py
def read(self) -> NDArray:
    """
    Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
    and blocks the thread if the deque is full.

    **Returns:** A n-dimensional numpy array.
    """
    # check if there are any thread exceptions
    if not (self.__exceptions is None):
        if isinstance(self.__exceptions, bool):
            # clear frame
            self.frame = None
            # notify user about hardware failure
            raise SystemError(
                "[PiGear:ERROR] :: Hardware failure occurred, Kindly reconnect Camera Module and restart your Pi!"
            )
        else:
            # clear frame
            self.frame = None
            # re-raise error for debugging
            error_msg = "[PiGear:ERROR] :: Camera Module API failure occurred: {}".format(
                self.__exceptions[1]
            )
            raise RuntimeError(error_msg).with_traceback(self.__exceptions[2])
    # return the frame
    return self.frame

start(self)

Launches the internal Threaded Frames Extractor daemon

Returns: A reference to the PiGear class object.

Source code in vidgear/gears/pigear.py
def start(self) -> PIGear:
    """
    Launches the internal *Threaded Frames Extractor* daemon

    **Returns:** A reference to the PiGear class object.
    """
    # Start frame producer thread
    self.__thread = Thread(target=self.__update, name="PiGear", args=())
    self.__thread.daemon = True
    self.__thread.start()

    # Start internal timer thread
    self.__timer = Thread(target=self.__timeit, name="PiTimer", args=())
    self.__timer.daemon = True
    self.__timer.start()

    return self

stop(self)

Safely terminates the thread, and release the multi-threaded resources.

Source code in vidgear/gears/pigear.py
def stop(self) -> None:
    """
    Safely terminates the thread, and release the multi-threaded resources.
    """
    # log termination
    self.__logging and logger.debug("Terminating PiGear Processes.")

    # make sure that the threads should be terminated
    self.__terminate = True

    # stop timer thread
    if not (self.__timer is None):
        self.__timer.join()
        self.__timer = None

    # handle camera thread
    if not (self.__thread is None):
        # check if hardware failure occurred
        if not (self.__exceptions is None) and isinstance(self.__exceptions, bool):
            if picamera2:
                # release picamera2 resources
                self.__camera.stop()
            else:
                # force release picamera resources
                self.__rawCapture.close()
                self.__camera.close()
        # properly handle thread exit
        # wait if still process is still
        # processing some information
        self.__thread.join()
        # remove any threads
        self.__thread = None