PiGear API
PiGear API usage examples can be found here ➶
PiGear API parameters are explained here ➶
PiGear is similar to CamGear API but exclusively made to support various Raspberry Pi Camera Modules (such as OmniVision OV5647 Camera Module and Sony IMX219 Camera Module). PiGear provides a flexible multi-threaded framework around complete picamera python library, and provide us the ability to exploit almost all of its parameters like brightness, saturation, sensor_mode, iso, exposure, etc. effortlessly. Furthermore, PiGear also supports multiple camera modules, such as in the case of Raspberry-Pi Compute Module IO boards.
Best of all, PiGear contains Threaded Internal Timer - that silently keeps active track of any frozen-threads/hardware-failures and exit safely, if any does occur. That means that if you're running PiGear API in your script and someone accidentally pulls the Camera-Module cable out, instead of going into possible kernel panic, API will exit safely to save resources.
Make sure to enable Raspberry Pi hardware-specific settings prior using this API, otherwise nothing will work.
Source code in vidgear/gears/pigear.py
class PiGear:
"""
PiGear is similar to CamGear API but exclusively made to support various Raspberry Pi Camera Modules (such as OmniVision OV5647 Camera Module and Sony IMX219 Camera Module).
PiGear provides a flexible multi-threaded framework around complete picamera python library, and provide us the ability to exploit almost all of its parameters like brightness,
saturation, sensor_mode, iso, exposure, etc. effortlessly. Furthermore, PiGear also supports multiple camera modules, such as in the case of Raspberry-Pi Compute Module IO boards.
Best of all, PiGear contains Threaded Internal Timer - that silently keeps active track of any frozen-threads/hardware-failures and exit safely, if any does occur. That means that
if you're running PiGear API in your script and someone accidentally pulls the Camera-Module cable out, instead of going into possible kernel panic, API will exit safely to save resources.
!!! warning "Make sure to enable [Raspberry Pi hardware-specific settings](https://picamera.readthedocs.io/en/release-1.13/quickstart.html) prior using this API, otherwise nothing will work."
"""
def __init__(
self,
camera_num=0,
resolution=(640, 480),
framerate=30,
colorspace=None,
logging=False,
time_delay=0,
**options
):
"""
This constructor method initializes the object state and attributes of the PiGear class.
Parameters:
camera_num (int): selects the camera module index which will be used as source.
resolution (tuple): sets the resolution (i.e. `(width,height)`) of the source..
framerate (int/float): sets the framerate of the source.
colorspace (str): selects the colorspace of the input stream.
logging (bool): enables/disables logging.
time_delay (int): time delay (in sec) before start reading the frames.
options (dict): provides ability to alter Source Tweak Parameters.
"""
# print current version
logcurr_vidgear_ver(logging=logging)
# raise error(s) for critical Class imports
import_dependency_safe(
"picamera" if picamera is None else "",
)
# enable logging if specified
self.__logging = False
if logging:
self.__logging = logging
assert (
isinstance(framerate, (int, float)) and framerate > 5.0
), "[PiGear:ERROR] :: Input framerate value `{}` is a Invalid! Kindly read docs.".format(
framerate
)
assert (
isinstance(resolution, (tuple, list)) and len(resolution) == 2
), "[PiGear:ERROR] :: Input resolution value `{}` is a Invalid! Kindly read docs.".format(
resolution
)
if not (isinstance(camera_num, int) and camera_num >= 0):
camera_num = 0
logger.warning(
"Input camera_num value `{}` is invalid, Defaulting to index 0!"
)
# initialize the picamera stream at given index
self.__camera = PiCamera(camera_num=camera_num)
self.__camera.resolution = tuple(resolution)
self.__camera.framerate = framerate
self.__logging and logger.debug(
"Activating Pi camera at index: {} with resolution: {} & framerate: {}".format(
camera_num, resolution, framerate
)
)
# initialize framerate variable
self.framerate = framerate
# initializing colorspace variable
self.color_space = None
# reformat dict
options = {str(k).strip(): v for k, v in options.items()}
# define timeout variable default value(handles hardware failures)
self.__failure_timeout = options.pop("HWFAILURE_TIMEOUT", 2.0)
if isinstance(self.__failure_timeout, (int, float)):
if not (10.0 > self.__failure_timeout > 1.0):
raise ValueError(
"[PiGear:ERROR] :: `HWFAILURE_TIMEOUT` value can only be between 1.0 ~ 10.0"
)
self.__logging and logger.debug(
"Setting HW Failure Timeout: {} seconds".format(self.__failure_timeout)
)
else:
# reset improper values
self.__failure_timeout = 2.0
try:
# apply attributes to source if specified
for key, value in options.items():
self.__logging and logger.debug(
"Setting Parameter: {} = '{}'".format(key, value)
)
setattr(self.__camera, key, value)
except Exception as e:
# Catch if any error occurred
logger.exception(str(e))
# separately handle colorspace value to int conversion
if not (colorspace is None):
self.color_space = capPropId(colorspace.strip())
if self.__logging and not (self.color_space is None):
logger.debug(
"Enabling `{}` colorspace for this video stream!".format(
colorspace.strip()
)
)
# enable rgb capture array thread and capture stream
self.__rawCapture = PiRGBArray(self.__camera, size=resolution)
self.stream = self.__camera.capture_continuous(
self.__rawCapture, format="bgr", use_video_port=True
)
# frame variable initialization
self.frame = None
try:
stream = next(self.stream)
self.frame = stream.array
self.__rawCapture.seek(0)
self.__rawCapture.truncate()
# render colorspace if defined
if not (self.frame is None) and not (self.color_space is None):
self.frame = cv2.cvtColor(self.frame, self.color_space)
except Exception as e:
logger.exception(str(e))
raise RuntimeError("[PiGear:ERROR] :: Camera Module failed to initialize!")
# applying time delay to warm-up picamera only if specified
if time_delay and isinstance(time_delay, (int, float)):
time.sleep(time_delay)
# thread initialization
self.__thread = None
# timer thread initialization(Keeps check on frozen thread)
self.__timer = None
self.__t_elasped = 0.0 # records time taken by thread
# catching thread exceptions
self.__exceptions = None
# initialize termination flag
self.__terminate = False
def start(self):
"""
Launches the internal *Threaded Frames Extractor* daemon
**Returns:** A reference to the CamGear class object.
"""
# Start frame producer thread
self.__thread = Thread(target=self.__update, name="PiGear", args=())
self.__thread.daemon = True
self.__thread.start()
# Start internal timer thread
self.__timer = Thread(target=self.__timeit, name="PiTimer", args=())
self.__timer.daemon = True
self.__timer.start()
return self
def __timeit(self):
"""
Threaded Internal Timer that keep checks on thread execution timing
"""
# assign current time
self.__t_elasped = time.time()
# loop until termainated
while not (self.__terminate):
# check for frozen thread
if time.time() - self.__t_elasped > self.__failure_timeout:
# log failure
self.__logging and logger.critical("Camera Module Disconnected!")
# prepare for clean exit
self.__exceptions = True
self.__terminate = True # self-terminate
def __update(self):
"""
A **Threaded Frames Extractor**, that keep iterating frames from PiCamera API to a internal monitored deque,
until the thread is terminated, or frames runs out.
"""
# keep looping infinitely until the thread is terminated
while not (self.__terminate):
try:
# Try to iterate next frame from generator
stream = next(self.stream)
except Exception:
# catch and save any exceptions
self.__exceptions = sys.exc_info()
break # exit
# __update timer
self.__t_elasped = time.time()
# grab the frame from the stream and clear the stream in
# preparation for the next frame
frame = stream.array
self.__rawCapture.seek(0)
self.__rawCapture.truncate()
# apply colorspace if specified
if not (self.color_space is None):
# apply colorspace to frames
color_frame = None
try:
if isinstance(self.color_space, int):
color_frame = cv2.cvtColor(frame, self.color_space)
else:
self.__logging and logger.warning(
"Global color_space parameter value `{}` is not a valid!".format(
self.color_space
)
)
self.color_space = None
except Exception as e:
# Catch if any error occurred
self.color_space = None
if self.__logging:
logger.exception(str(e))
logger.warning("Input colorspace is not a valid colorspace!")
if not (color_frame is None):
self.frame = color_frame
else:
self.frame = frame
else:
self.frame = frame
# terminate processes
if not (self.__terminate):
self.__terminate = True
# release picamera resources
self.__rawCapture.close()
self.__camera.close()
def read(self):
"""
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
and blocks the thread if the deque is full.
**Returns:** A n-dimensional numpy array.
"""
# check if there are any thread exceptions
if not (self.__exceptions is None):
if isinstance(self.__exceptions, bool):
# clear frame
self.frame = None
# notify user about hardware failure
raise SystemError(
"[PiGear:ERROR] :: Hardware failure occurred, Kindly reconnect Camera Module and restart your Pi!"
)
else:
# clear frame
self.frame = None
# re-raise error for debugging
error_msg = (
"[PiGear:ERROR] :: Camera Module API failure occured: {}".format(
self.__exceptions[1]
)
)
raise RuntimeError(error_msg).with_traceback(self.__exceptions[2])
# return the frame
return self.frame
def stop(self):
"""
Safely terminates the thread, and release the VideoStream resources.
"""
self.__logging and logger.debug("Terminating PiGear Processes.")
# make sure that the threads should be terminated
self.__terminate = True
# stop timer thread
if not (self.__timer is None):
self.__timer.join()
self.__timer = None
# handle camera thread
if not (self.__thread is None):
# check if hardware failure occured
if not (self.__exceptions is None) and isinstance(self.__exceptions, bool):
# force release picamera resources
self.__rawCapture.close()
self.__camera.close()
# properly handle thread exit
self.__thread.join() # wait if still process is still processing some information
# remove any threads
self.__thread = None
__init__(self, camera_num=0, resolution=(640, 480), framerate=30, colorspace=None, logging=False, time_delay=0, **options)
special
⚓
This constructor method initializes the object state and attributes of the PiGear class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
camera_num | int | selects the camera module index which will be used as source. | 0 |
resolution | tuple | sets the resolution (i.e. | (640, 480) |
framerate | int/float | sets the framerate of the source. | 30 |
colorspace | str | selects the colorspace of the input stream. | None |
logging | bool | enables/disables logging. | False |
time_delay | int | time delay (in sec) before start reading the frames. | 0 |
options | dict | provides ability to alter Source Tweak Parameters. | {} |
Source code in vidgear/gears/pigear.py
def __init__(
self,
camera_num=0,
resolution=(640, 480),
framerate=30,
colorspace=None,
logging=False,
time_delay=0,
**options
):
"""
This constructor method initializes the object state and attributes of the PiGear class.
Parameters:
camera_num (int): selects the camera module index which will be used as source.
resolution (tuple): sets the resolution (i.e. `(width,height)`) of the source..
framerate (int/float): sets the framerate of the source.
colorspace (str): selects the colorspace of the input stream.
logging (bool): enables/disables logging.
time_delay (int): time delay (in sec) before start reading the frames.
options (dict): provides ability to alter Source Tweak Parameters.
"""
# print current version
logcurr_vidgear_ver(logging=logging)
# raise error(s) for critical Class imports
import_dependency_safe(
"picamera" if picamera is None else "",
)
# enable logging if specified
self.__logging = False
if logging:
self.__logging = logging
assert (
isinstance(framerate, (int, float)) and framerate > 5.0
), "[PiGear:ERROR] :: Input framerate value `{}` is a Invalid! Kindly read docs.".format(
framerate
)
assert (
isinstance(resolution, (tuple, list)) and len(resolution) == 2
), "[PiGear:ERROR] :: Input resolution value `{}` is a Invalid! Kindly read docs.".format(
resolution
)
if not (isinstance(camera_num, int) and camera_num >= 0):
camera_num = 0
logger.warning(
"Input camera_num value `{}` is invalid, Defaulting to index 0!"
)
# initialize the picamera stream at given index
self.__camera = PiCamera(camera_num=camera_num)
self.__camera.resolution = tuple(resolution)
self.__camera.framerate = framerate
self.__logging and logger.debug(
"Activating Pi camera at index: {} with resolution: {} & framerate: {}".format(
camera_num, resolution, framerate
)
)
# initialize framerate variable
self.framerate = framerate
# initializing colorspace variable
self.color_space = None
# reformat dict
options = {str(k).strip(): v for k, v in options.items()}
# define timeout variable default value(handles hardware failures)
self.__failure_timeout = options.pop("HWFAILURE_TIMEOUT", 2.0)
if isinstance(self.__failure_timeout, (int, float)):
if not (10.0 > self.__failure_timeout > 1.0):
raise ValueError(
"[PiGear:ERROR] :: `HWFAILURE_TIMEOUT` value can only be between 1.0 ~ 10.0"
)
self.__logging and logger.debug(
"Setting HW Failure Timeout: {} seconds".format(self.__failure_timeout)
)
else:
# reset improper values
self.__failure_timeout = 2.0
try:
# apply attributes to source if specified
for key, value in options.items():
self.__logging and logger.debug(
"Setting Parameter: {} = '{}'".format(key, value)
)
setattr(self.__camera, key, value)
except Exception as e:
# Catch if any error occurred
logger.exception(str(e))
# separately handle colorspace value to int conversion
if not (colorspace is None):
self.color_space = capPropId(colorspace.strip())
if self.__logging and not (self.color_space is None):
logger.debug(
"Enabling `{}` colorspace for this video stream!".format(
colorspace.strip()
)
)
# enable rgb capture array thread and capture stream
self.__rawCapture = PiRGBArray(self.__camera, size=resolution)
self.stream = self.__camera.capture_continuous(
self.__rawCapture, format="bgr", use_video_port=True
)
# frame variable initialization
self.frame = None
try:
stream = next(self.stream)
self.frame = stream.array
self.__rawCapture.seek(0)
self.__rawCapture.truncate()
# render colorspace if defined
if not (self.frame is None) and not (self.color_space is None):
self.frame = cv2.cvtColor(self.frame, self.color_space)
except Exception as e:
logger.exception(str(e))
raise RuntimeError("[PiGear:ERROR] :: Camera Module failed to initialize!")
# applying time delay to warm-up picamera only if specified
if time_delay and isinstance(time_delay, (int, float)):
time.sleep(time_delay)
# thread initialization
self.__thread = None
# timer thread initialization(Keeps check on frozen thread)
self.__timer = None
self.__t_elasped = 0.0 # records time taken by thread
# catching thread exceptions
self.__exceptions = None
# initialize termination flag
self.__terminate = False
read(self)
⚓
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full.
Returns: A n-dimensional numpy array.
Source code in vidgear/gears/pigear.py
def read(self):
"""
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
and blocks the thread if the deque is full.
**Returns:** A n-dimensional numpy array.
"""
# check if there are any thread exceptions
if not (self.__exceptions is None):
if isinstance(self.__exceptions, bool):
# clear frame
self.frame = None
# notify user about hardware failure
raise SystemError(
"[PiGear:ERROR] :: Hardware failure occurred, Kindly reconnect Camera Module and restart your Pi!"
)
else:
# clear frame
self.frame = None
# re-raise error for debugging
error_msg = (
"[PiGear:ERROR] :: Camera Module API failure occured: {}".format(
self.__exceptions[1]
)
)
raise RuntimeError(error_msg).with_traceback(self.__exceptions[2])
# return the frame
return self.frame
start(self)
⚓
Launches the internal Threaded Frames Extractor daemon
Returns: A reference to the CamGear class object.
Source code in vidgear/gears/pigear.py
def start(self):
"""
Launches the internal *Threaded Frames Extractor* daemon
**Returns:** A reference to the CamGear class object.
"""
# Start frame producer thread
self.__thread = Thread(target=self.__update, name="PiGear", args=())
self.__thread.daemon = True
self.__thread.start()
# Start internal timer thread
self.__timer = Thread(target=self.__timeit, name="PiTimer", args=())
self.__timer.daemon = True
self.__timer.start()
return self
stop(self)
⚓
Safely terminates the thread, and release the VideoStream resources.
Source code in vidgear/gears/pigear.py
def stop(self):
"""
Safely terminates the thread, and release the VideoStream resources.
"""
self.__logging and logger.debug("Terminating PiGear Processes.")
# make sure that the threads should be terminated
self.__terminate = True
# stop timer thread
if not (self.__timer is None):
self.__timer.join()
self.__timer = None
# handle camera thread
if not (self.__thread is None):
# check if hardware failure occured
if not (self.__exceptions is None) and isinstance(self.__exceptions, bool):
# force release picamera resources
self.__rawCapture.close()
self.__camera.close()
# properly handle thread exit
self.__thread.join() # wait if still process is still processing some information
# remove any threads
self.__thread = None