ScreenGear API
ScreenGear API usage examples can be found here ➶
ScreenGear API parameters are explained here ➶
ScreenGear is designed exclusively for ultra-fast Screencasting, which means it can grab frames from your monitor in real-time, either by defining an area on the computer screen or full-screen, at the expense of inconsiderable latency. ScreenGear also seamlessly support frame capturing from multiple monitors as well as supports multiple backends.
ScreenGear API implements a multi-threaded wrapper around pyscreenshot & python-mss python library, and also flexibly supports its internal parameter.
Source code in vidgear/gears/screengear.py
class ScreenGear:
"""
ScreenGear is designed exclusively for ultra-fast Screencasting, which means it can grab frames from your monitor in real-time, either by defining an area on the computer screen or full-screen,
at the expense of inconsiderable latency. ScreenGear also seamlessly support frame capturing from multiple monitors as well as supports multiple backends.
ScreenGear API implements a multi-threaded wrapper around pyscreenshot & python-mss python library, and also flexibly supports its internal parameter.
"""
def __init__(
self, monitor=None, backend="", colorspace=None, logging=False, **options
):
"""
This constructor method initializes the object state and attributes of the ScreenGear class.
Parameters:
monitor (int): enables `mss` backend and sets the index of the monitor screen.
backend (str): enables `pyscreenshot` and select suitable backend for extracting frames.
colorspace (str): selects the colorspace of the input stream.
logging (bool): enables/disables logging.
options (dict): provides the flexibility to manually set the dimensions of capture screen area.
"""
# print current version
logcurr_vidgear_ver(logging=logging)
# raise error(s) for critical Class imports
import_dependency_safe(
"from mss import mss" if mss is None else "", pkg_name="mss"
)
import_dependency_safe("pyscreenshot" if pysct is None else "")
# enable logging if specified:
self.__logging = logging if isinstance(logging, bool) else False
# create monitor instance for the user-defined monitor
self.__monitor_instance = None
self.__backend = ""
if monitor is None:
self.__capture_object = pysct
self.__backend = backend.lower().strip()
else:
self.__capture_object = mss()
if backend.strip():
logger.warning(
"Backends are disabled for Monitor Indexing(monitor>=0)!"
)
try:
self.__monitor_instance = self.__capture_object.monitors[monitor]
except Exception as e:
logger.exception(str(e))
self.__monitor_instance = None
# assigns special parameter to global variable and clear
# Thread Timeout
self.__thread_timeout = options.pop("THREAD_TIMEOUT", None)
if self.__thread_timeout and isinstance(self.__thread_timeout, (int, float)):
# set values
self.__thread_timeout = float(self.__thread_timeout)
else:
# defaults to 5mins timeout
self.__thread_timeout = None
# define deque and assign it to global var
self.__queue = queue.Queue(maxsize=96) # max bufferlen 96 to check overflow
# log it
if logging:
logger.debug("Enabling Threaded Queue Mode by default for ScreenGear!")
if self.__thread_timeout:
logger.debug(
"Setting Video-Thread Timeout to {}s.".format(self.__thread_timeout)
)
# intiate screen dimension handler
screen_dims = {}
# reformat proper mss dict and assign to screen dimension handler
screen_dims = {
k.strip(): v
for k, v in options.items()
if k.strip() in ["top", "left", "width", "height"]
}
# check whether user-defined dimensions are provided
if screen_dims and len(screen_dims) == 4:
key_order = ("top", "left", "width", "height")
screen_dims = OrderedDict((k, screen_dims[k]) for k in key_order)
if logging:
logger.debug("Setting Capture-Area dimensions: {}!".format(screen_dims))
else:
screen_dims.clear()
# separately handle colorspace value to int conversion
if colorspace:
self.color_space = capPropId(colorspace.strip())
if logging and not (self.color_space is None):
logger.debug(
"Enabling `{}` colorspace for this video stream!".format(
colorspace.strip()
)
)
else:
self.color_space = None
# intialize mss capture instance
self.__mss_capture_instance = ""
try:
if self.__monitor_instance is None:
if screen_dims:
self.__mss_capture_instance = tuple(screen_dims.values())
# extract global frame from instance
self.frame = np.asanyarray(
self.__capture_object.grab(
bbox=self.__mss_capture_instance,
childprocess=False,
backend=self.__backend,
)
)
else:
if screen_dims:
self.__mss_capture_instance = {
"top": self.__monitor_instance["top"] + screen_dims["top"],
"left": self.__monitor_instance["left"] + screen_dims["left"],
"width": screen_dims["width"],
"height": screen_dims["height"],
"mon": monitor,
}
else:
self.__mss_capture_instance = (
self.__monitor_instance # otherwise create instance from monitor
)
# extract global frame from instance
self.frame = np.asanyarray(
self.__capture_object.grab(self.__mss_capture_instance)
)
# initialize and append to queue
self.__queue.put(self.frame)
except Exception as e:
if isinstance(e, ScreenShotError):
# otherwise catch and log errors
if logging:
logger.exception(self.__capture_object.get_error_details())
raise ValueError(
"[ScreenGear:ERROR] :: ScreenShotError caught, Wrong dimensions passed to python-mss, Kindly Refer Docs!"
)
elif isinstance(e, KeyError):
raise ValueError(
"[ScreenGear:ERROR] :: ScreenShotError caught, Invalid backend: `{}`, Kindly Refer Docs!".format(
backend
)
)
else:
raise SystemError(
"[ScreenGear:ERROR] :: Unable to grab any instance on this system, Are you running headless?"
)
# thread initialization
self.__thread = None
# initialize termination flag
self.__terminate = Event()
def start(self):
"""
Launches the internal *Threaded Frames Extractor* daemon
**Returns:** A reference to the ScreenGear class object.
"""
self.__thread = Thread(target=self.__update, name="ScreenGear", args=())
self.__thread.daemon = True
self.__thread.start()
return self
def __update(self):
"""
A **Threaded Frames Extractor**, that keep iterating frames from `mss` API to a internal monitored deque,
until the thread is terminated, or frames runs out.
"""
# initialize frame variable
frame = None
# keep looping infinitely until the thread is terminated
while not self.__terminate.is_set():
try:
if self.__monitor_instance:
frame = np.asanyarray(
self.__capture_object.grab(self.__mss_capture_instance)
)
else:
frame = np.asanyarray(
self.__capture_object.grab(
bbox=self.__mss_capture_instance,
childprocess=False,
backend=self.__backend,
)
)
if not self.__backend or self.__backend == "pil":
frame = frame[:, :, ::-1]
assert not (
frame is None or np.shape(frame) == ()
), "[ScreenGear:ERROR] :: Failed to retreive any valid frames!"
except Exception as e:
if isinstance(e, ScreenShotError):
raise RuntimeError(self.__capture_object.get_error_details())
else:
logger.exception(str(e))
self.__terminate.set()
continue
if not (self.color_space is None):
# apply colorspace to frames
color_frame = None
try:
if isinstance(self.color_space, int):
color_frame = cv2.cvtColor(frame, self.color_space)
else:
self.__logging and logger.warning(
"Global color_space parameter value `{}` is not a valid!".format(
self.color_space
)
)
self.color_space = None
except Exception as e:
# Catch if any error occurred
self.color_space = None
if self.__logging:
logger.exception(str(e))
logger.warning("Input colorspace is not a valid colorspace!")
if not (color_frame is None):
self.frame = color_frame
else:
self.frame = frame
else:
self.frame = frame
# append to queue
self.__queue.put(self.frame)
# signal queue we're done
self.__queue.put(None)
# indicate immediate termination
self.__terminate.set()
# finally release mss resources
if self.__monitor_instance:
self.__capture_object.close()
def read(self):
"""
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
and blocks the thread if the deque is full.
**Returns:** A n-dimensional numpy array.
"""
# check whether or not termination flag is enabled
while not self.__terminate.is_set():
if self.__queue.empty():
break
return self.__queue.get(timeout=self.__thread_timeout)
# otherwise return NoneType
return None
def stop(self):
"""
Safely terminates the thread, and release the resources.
"""
self.__logging and logger.debug("Terminating ScreenGear Processes.")
# indicate that the thread should be terminate
self.__terminate.set()
# wait until stream resources are released (producer thread might be still grabbing frame)
if self.__thread is not None:
if not (self.__queue is None):
while not self.__queue.empty():
try:
self.__queue.get_nowait()
except queue.Empty:
continue
self.__queue.task_done()
self.__thread.join()
__init__(self, monitor=None, backend='', colorspace=None, logging=False, **options)
special
⚓
This constructor method initializes the object state and attributes of the ScreenGear class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
monitor | int | enables | None |
backend | str | enables | '' |
colorspace | str | selects the colorspace of the input stream. | None |
logging | bool | enables/disables logging. | False |
options | dict | provides the flexibility to manually set the dimensions of capture screen area. | {} |
Source code in vidgear/gears/screengear.py
def __init__(
self, monitor=None, backend="", colorspace=None, logging=False, **options
):
"""
This constructor method initializes the object state and attributes of the ScreenGear class.
Parameters:
monitor (int): enables `mss` backend and sets the index of the monitor screen.
backend (str): enables `pyscreenshot` and select suitable backend for extracting frames.
colorspace (str): selects the colorspace of the input stream.
logging (bool): enables/disables logging.
options (dict): provides the flexibility to manually set the dimensions of capture screen area.
"""
# print current version
logcurr_vidgear_ver(logging=logging)
# raise error(s) for critical Class imports
import_dependency_safe(
"from mss import mss" if mss is None else "", pkg_name="mss"
)
import_dependency_safe("pyscreenshot" if pysct is None else "")
# enable logging if specified:
self.__logging = logging if isinstance(logging, bool) else False
# create monitor instance for the user-defined monitor
self.__monitor_instance = None
self.__backend = ""
if monitor is None:
self.__capture_object = pysct
self.__backend = backend.lower().strip()
else:
self.__capture_object = mss()
if backend.strip():
logger.warning(
"Backends are disabled for Monitor Indexing(monitor>=0)!"
)
try:
self.__monitor_instance = self.__capture_object.monitors[monitor]
except Exception as e:
logger.exception(str(e))
self.__monitor_instance = None
# assigns special parameter to global variable and clear
# Thread Timeout
self.__thread_timeout = options.pop("THREAD_TIMEOUT", None)
if self.__thread_timeout and isinstance(self.__thread_timeout, (int, float)):
# set values
self.__thread_timeout = float(self.__thread_timeout)
else:
# defaults to 5mins timeout
self.__thread_timeout = None
# define deque and assign it to global var
self.__queue = queue.Queue(maxsize=96) # max bufferlen 96 to check overflow
# log it
if logging:
logger.debug("Enabling Threaded Queue Mode by default for ScreenGear!")
if self.__thread_timeout:
logger.debug(
"Setting Video-Thread Timeout to {}s.".format(self.__thread_timeout)
)
# intiate screen dimension handler
screen_dims = {}
# reformat proper mss dict and assign to screen dimension handler
screen_dims = {
k.strip(): v
for k, v in options.items()
if k.strip() in ["top", "left", "width", "height"]
}
# check whether user-defined dimensions are provided
if screen_dims and len(screen_dims) == 4:
key_order = ("top", "left", "width", "height")
screen_dims = OrderedDict((k, screen_dims[k]) for k in key_order)
if logging:
logger.debug("Setting Capture-Area dimensions: {}!".format(screen_dims))
else:
screen_dims.clear()
# separately handle colorspace value to int conversion
if colorspace:
self.color_space = capPropId(colorspace.strip())
if logging and not (self.color_space is None):
logger.debug(
"Enabling `{}` colorspace for this video stream!".format(
colorspace.strip()
)
)
else:
self.color_space = None
# intialize mss capture instance
self.__mss_capture_instance = ""
try:
if self.__monitor_instance is None:
if screen_dims:
self.__mss_capture_instance = tuple(screen_dims.values())
# extract global frame from instance
self.frame = np.asanyarray(
self.__capture_object.grab(
bbox=self.__mss_capture_instance,
childprocess=False,
backend=self.__backend,
)
)
else:
if screen_dims:
self.__mss_capture_instance = {
"top": self.__monitor_instance["top"] + screen_dims["top"],
"left": self.__monitor_instance["left"] + screen_dims["left"],
"width": screen_dims["width"],
"height": screen_dims["height"],
"mon": monitor,
}
else:
self.__mss_capture_instance = (
self.__monitor_instance # otherwise create instance from monitor
)
# extract global frame from instance
self.frame = np.asanyarray(
self.__capture_object.grab(self.__mss_capture_instance)
)
# initialize and append to queue
self.__queue.put(self.frame)
except Exception as e:
if isinstance(e, ScreenShotError):
# otherwise catch and log errors
if logging:
logger.exception(self.__capture_object.get_error_details())
raise ValueError(
"[ScreenGear:ERROR] :: ScreenShotError caught, Wrong dimensions passed to python-mss, Kindly Refer Docs!"
)
elif isinstance(e, KeyError):
raise ValueError(
"[ScreenGear:ERROR] :: ScreenShotError caught, Invalid backend: `{}`, Kindly Refer Docs!".format(
backend
)
)
else:
raise SystemError(
"[ScreenGear:ERROR] :: Unable to grab any instance on this system, Are you running headless?"
)
# thread initialization
self.__thread = None
# initialize termination flag
self.__terminate = Event()
read(self)
⚓
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full.
Returns: A n-dimensional numpy array.
Source code in vidgear/gears/screengear.py
def read(self):
"""
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
and blocks the thread if the deque is full.
**Returns:** A n-dimensional numpy array.
"""
# check whether or not termination flag is enabled
while not self.__terminate.is_set():
if self.__queue.empty():
break
return self.__queue.get(timeout=self.__thread_timeout)
# otherwise return NoneType
return None
start(self)
⚓
Launches the internal Threaded Frames Extractor daemon
Returns: A reference to the ScreenGear class object.
Source code in vidgear/gears/screengear.py
stop(self)
⚓
Safely terminates the thread, and release the resources.
Source code in vidgear/gears/screengear.py
def stop(self):
"""
Safely terminates the thread, and release the resources.
"""
self.__logging and logger.debug("Terminating ScreenGear Processes.")
# indicate that the thread should be terminate
self.__terminate.set()
# wait until stream resources are released (producer thread might be still grabbing frame)
if self.__thread is not None:
if not (self.__queue is None):
while not self.__queue.empty():
try:
self.__queue.get_nowait()
except queue.Empty:
continue
self.__queue.task_done()
self.__thread.join()