Skip to content

ScreenGear API

ScreenGear API usage examples can be found here ➶

ScreenGear API parameters are explained here ➶

ScreenGear is designed exclusively for ultra-fast Screencasting, which means it can grab frames from your monitor in real-time, either by defining an area on the computer screen or full-screen, at the expense of inconsiderable latency. ScreenGear also seamlessly support frame capturing from multiple monitors as well as supports multiple backends.

ScreenGear API implements a multi-threaded wrapper around pyscreenshot & python-mss python library, and also flexibly supports its internal parameter.

Source code in vidgear/gears/screengear.py
class ScreenGear:
    """
    ScreenGear is designed exclusively for ultra-fast Screencasting, which means it can grab frames from your monitor in real-time, either by defining an area on the computer screen or full-screen,
    at the expense of inconsiderable latency. ScreenGear also seamlessly support frame capturing from multiple monitors as well as supports multiple backends.

    ScreenGear API implements a multi-threaded wrapper around pyscreenshot & python-mss python library, and also flexibly supports its internal parameter.
    """

    def __init__(
        self, monitor=None, backend="", colorspace=None, logging=False, **options
    ):
        """
        This constructor method initializes the object state and attributes of the ScreenGear class.

        Parameters:
            monitor (int): enables `mss` backend and sets the index of the monitor screen.
            backend (str): enables `pyscreenshot` and select suitable backend for extracting frames.
            colorspace (str): selects the colorspace of the input stream.
            logging (bool): enables/disables logging.
            options (dict): provides the flexibility to manually set the dimensions of capture screen area.
        """
        # print current version
        logcurr_vidgear_ver(logging=logging)

        # raise error(s) for critical Class imports
        import_dependency_safe(
            "from mss import mss" if mss is None else "", pkg_name="mss"
        )
        import_dependency_safe("pyscreenshot" if pysct is None else "")

        # enable logging if specified:
        self.__logging = logging if isinstance(logging, bool) else False

        # create monitor instance for the user-defined monitor
        self.__monitor_instance = None
        self.__backend = ""
        if monitor is None:
            self.__capture_object = pysct
            self.__backend = backend.lower().strip()
        else:
            self.__capture_object = mss()
            if backend.strip():
                logger.warning(
                    "Backends are disabled for Monitor Indexing(monitor>=0)!"
                )
            try:
                self.__monitor_instance = self.__capture_object.monitors[monitor]
            except Exception as e:
                logger.exception(str(e))
                self.__monitor_instance = None

        # assigns special parameter to global variable and clear
        # Thread Timeout
        self.__thread_timeout = options.pop("THREAD_TIMEOUT", None)
        if self.__thread_timeout and isinstance(self.__thread_timeout, (int, float)):
            # set values
            self.__thread_timeout = float(self.__thread_timeout)
        else:
            # defaults to 5mins timeout
            self.__thread_timeout = None

        # define deque and assign it to global var
        self.__queue = queue.Queue(maxsize=96)  # max bufferlen 96 to check overflow
        # log it
        if logging:
            logger.debug("Enabling Threaded Queue Mode by default for ScreenGear!")
            if self.__thread_timeout:
                logger.debug(
                    "Setting Video-Thread Timeout to {}s.".format(self.__thread_timeout)
                )

        # intiate screen dimension handler
        screen_dims = {}
        # reformat proper mss dict and assign to screen dimension handler
        screen_dims = {
            k.strip(): v
            for k, v in options.items()
            if k.strip() in ["top", "left", "width", "height"]
        }
        # check whether user-defined dimensions are provided
        if screen_dims and len(screen_dims) == 4:
            key_order = ("top", "left", "width", "height")
            screen_dims = OrderedDict((k, screen_dims[k]) for k in key_order)
            if logging:
                logger.debug("Setting Capture-Area dimensions: {}!".format(screen_dims))
        else:
            screen_dims.clear()

        # separately handle colorspace value to int conversion
        if colorspace:
            self.color_space = capPropId(colorspace.strip())
            if logging and not (self.color_space is None):
                logger.debug(
                    "Enabling `{}` colorspace for this video stream!".format(
                        colorspace.strip()
                    )
                )
        else:
            self.color_space = None

        # intialize mss capture instance
        self.__mss_capture_instance = ""
        try:
            if self.__monitor_instance is None:
                if screen_dims:
                    self.__mss_capture_instance = tuple(screen_dims.values())
                # extract global frame from instance
                self.frame = np.asanyarray(
                    self.__capture_object.grab(
                        bbox=self.__mss_capture_instance,
                        childprocess=False,
                        backend=self.__backend,
                    )
                )
            else:
                if screen_dims:
                    self.__mss_capture_instance = {
                        "top": self.__monitor_instance["top"] + screen_dims["top"],
                        "left": self.__monitor_instance["left"] + screen_dims["left"],
                        "width": screen_dims["width"],
                        "height": screen_dims["height"],
                        "mon": monitor,
                    }
                else:
                    self.__mss_capture_instance = (
                        self.__monitor_instance  # otherwise create instance from monitor
                    )
                # extract global frame from instance
                self.frame = np.asanyarray(
                    self.__capture_object.grab(self.__mss_capture_instance)
                )
            # initialize and append to queue
            self.__queue.put(self.frame)
        except Exception as e:
            if isinstance(e, ScreenShotError):
                # otherwise catch and log errors
                if logging:
                    logger.exception(self.__capture_object.get_error_details())
                raise ValueError(
                    "[ScreenGear:ERROR] :: ScreenShotError caught, Wrong dimensions passed to python-mss, Kindly Refer Docs!"
                )
            elif isinstance(e, KeyError):
                raise ValueError(
                    "[ScreenGear:ERROR] :: ScreenShotError caught, Invalid backend: `{}`, Kindly Refer Docs!".format(
                        backend
                    )
                )
            else:
                raise SystemError(
                    "[ScreenGear:ERROR] :: Unable to grab any instance on this system, Are you running headless?"
                )

        # thread initialization
        self.__thread = None
        # initialize termination flag
        self.__terminate = Event()

    def start(self):
        """
        Launches the internal *Threaded Frames Extractor* daemon

        **Returns:** A reference to the ScreenGear class object.
        """
        self.__thread = Thread(target=self.__update, name="ScreenGear", args=())
        self.__thread.daemon = True
        self.__thread.start()
        return self

    def __update(self):
        """
        A **Threaded Frames Extractor**, that keep iterating frames from `mss` API to a internal monitored deque,
        until the thread is terminated, or frames runs out.
        """
        # initialize frame variable
        frame = None
        # keep looping infinitely until the thread is terminated
        while not self.__terminate.is_set():
            try:
                if self.__monitor_instance:
                    frame = np.asanyarray(
                        self.__capture_object.grab(self.__mss_capture_instance)
                    )
                else:
                    frame = np.asanyarray(
                        self.__capture_object.grab(
                            bbox=self.__mss_capture_instance,
                            childprocess=False,
                            backend=self.__backend,
                        )
                    )
                    if not self.__backend or self.__backend == "pil":
                        frame = frame[:, :, ::-1]
                assert not (
                    frame is None or np.shape(frame) == ()
                ), "[ScreenGear:ERROR] :: Failed to retreive any valid frames!"
            except Exception as e:
                if isinstance(e, ScreenShotError):
                    raise RuntimeError(self.__capture_object.get_error_details())
                else:
                    logger.exception(str(e))
                self.__terminate.set()
                continue

            if not (self.color_space is None):
                # apply colorspace to frames
                color_frame = None
                try:
                    if isinstance(self.color_space, int):
                        color_frame = cv2.cvtColor(frame, self.color_space)
                    else:
                        self.__logging and logger.warning(
                            "Global color_space parameter value `{}` is not a valid!".format(
                                self.color_space
                            )
                        )
                        self.color_space = None
                except Exception as e:
                    # Catch if any error occurred
                    self.color_space = None
                    if self.__logging:
                        logger.exception(str(e))
                        logger.warning("Input colorspace is not a valid colorspace!")
                if not (color_frame is None):
                    self.frame = color_frame
                else:
                    self.frame = frame
            else:
                self.frame = frame
            # append to queue
            self.__queue.put(self.frame)

        # signal queue we're done
        self.__queue.put(None)

        # indicate immediate termination
        self.__terminate.set()

        # finally release mss resources
        if self.__monitor_instance:
            self.__capture_object.close()

    def read(self):
        """
        Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
        and blocks the thread if the deque is full.

        **Returns:** A n-dimensional numpy array.
        """
        # check whether or not termination flag is enabled
        while not self.__terminate.is_set():
            if self.__queue.empty():
                break
            return self.__queue.get(timeout=self.__thread_timeout)
        # otherwise return NoneType
        return None

    def stop(self):
        """
        Safely terminates the thread, and release the resources.
        """
        self.__logging and logger.debug("Terminating ScreenGear Processes.")

        # indicate that the thread should be terminate
        self.__terminate.set()

        # wait until stream resources are released (producer thread might be still grabbing frame)
        if self.__thread is not None:
            if not (self.__queue is None):
                while not self.__queue.empty():
                    try:
                        self.__queue.get_nowait()
                    except queue.Empty:
                        continue
                    self.__queue.task_done()
            self.__thread.join()

__init__(self, monitor=None, backend='', colorspace=None, logging=False, **options) special

This constructor method initializes the object state and attributes of the ScreenGear class.

Parameters:

Name Type Description Default
monitor int

enables mss backend and sets the index of the monitor screen.

None
backend str

enables pyscreenshot and select suitable backend for extracting frames.

''
colorspace str

selects the colorspace of the input stream.

None
logging bool

enables/disables logging.

False
options dict

provides the flexibility to manually set the dimensions of capture screen area.

{}
Source code in vidgear/gears/screengear.py
def __init__(
    self, monitor=None, backend="", colorspace=None, logging=False, **options
):
    """
    This constructor method initializes the object state and attributes of the ScreenGear class.

    Parameters:
        monitor (int): enables `mss` backend and sets the index of the monitor screen.
        backend (str): enables `pyscreenshot` and select suitable backend for extracting frames.
        colorspace (str): selects the colorspace of the input stream.
        logging (bool): enables/disables logging.
        options (dict): provides the flexibility to manually set the dimensions of capture screen area.
    """
    # print current version
    logcurr_vidgear_ver(logging=logging)

    # raise error(s) for critical Class imports
    import_dependency_safe(
        "from mss import mss" if mss is None else "", pkg_name="mss"
    )
    import_dependency_safe("pyscreenshot" if pysct is None else "")

    # enable logging if specified:
    self.__logging = logging if isinstance(logging, bool) else False

    # create monitor instance for the user-defined monitor
    self.__monitor_instance = None
    self.__backend = ""
    if monitor is None:
        self.__capture_object = pysct
        self.__backend = backend.lower().strip()
    else:
        self.__capture_object = mss()
        if backend.strip():
            logger.warning(
                "Backends are disabled for Monitor Indexing(monitor>=0)!"
            )
        try:
            self.__monitor_instance = self.__capture_object.monitors[monitor]
        except Exception as e:
            logger.exception(str(e))
            self.__monitor_instance = None

    # assigns special parameter to global variable and clear
    # Thread Timeout
    self.__thread_timeout = options.pop("THREAD_TIMEOUT", None)
    if self.__thread_timeout and isinstance(self.__thread_timeout, (int, float)):
        # set values
        self.__thread_timeout = float(self.__thread_timeout)
    else:
        # defaults to 5mins timeout
        self.__thread_timeout = None

    # define deque and assign it to global var
    self.__queue = queue.Queue(maxsize=96)  # max bufferlen 96 to check overflow
    # log it
    if logging:
        logger.debug("Enabling Threaded Queue Mode by default for ScreenGear!")
        if self.__thread_timeout:
            logger.debug(
                "Setting Video-Thread Timeout to {}s.".format(self.__thread_timeout)
            )

    # intiate screen dimension handler
    screen_dims = {}
    # reformat proper mss dict and assign to screen dimension handler
    screen_dims = {
        k.strip(): v
        for k, v in options.items()
        if k.strip() in ["top", "left", "width", "height"]
    }
    # check whether user-defined dimensions are provided
    if screen_dims and len(screen_dims) == 4:
        key_order = ("top", "left", "width", "height")
        screen_dims = OrderedDict((k, screen_dims[k]) for k in key_order)
        if logging:
            logger.debug("Setting Capture-Area dimensions: {}!".format(screen_dims))
    else:
        screen_dims.clear()

    # separately handle colorspace value to int conversion
    if colorspace:
        self.color_space = capPropId(colorspace.strip())
        if logging and not (self.color_space is None):
            logger.debug(
                "Enabling `{}` colorspace for this video stream!".format(
                    colorspace.strip()
                )
            )
    else:
        self.color_space = None

    # intialize mss capture instance
    self.__mss_capture_instance = ""
    try:
        if self.__monitor_instance is None:
            if screen_dims:
                self.__mss_capture_instance = tuple(screen_dims.values())
            # extract global frame from instance
            self.frame = np.asanyarray(
                self.__capture_object.grab(
                    bbox=self.__mss_capture_instance,
                    childprocess=False,
                    backend=self.__backend,
                )
            )
        else:
            if screen_dims:
                self.__mss_capture_instance = {
                    "top": self.__monitor_instance["top"] + screen_dims["top"],
                    "left": self.__monitor_instance["left"] + screen_dims["left"],
                    "width": screen_dims["width"],
                    "height": screen_dims["height"],
                    "mon": monitor,
                }
            else:
                self.__mss_capture_instance = (
                    self.__monitor_instance  # otherwise create instance from monitor
                )
            # extract global frame from instance
            self.frame = np.asanyarray(
                self.__capture_object.grab(self.__mss_capture_instance)
            )
        # initialize and append to queue
        self.__queue.put(self.frame)
    except Exception as e:
        if isinstance(e, ScreenShotError):
            # otherwise catch and log errors
            if logging:
                logger.exception(self.__capture_object.get_error_details())
            raise ValueError(
                "[ScreenGear:ERROR] :: ScreenShotError caught, Wrong dimensions passed to python-mss, Kindly Refer Docs!"
            )
        elif isinstance(e, KeyError):
            raise ValueError(
                "[ScreenGear:ERROR] :: ScreenShotError caught, Invalid backend: `{}`, Kindly Refer Docs!".format(
                    backend
                )
            )
        else:
            raise SystemError(
                "[ScreenGear:ERROR] :: Unable to grab any instance on this system, Are you running headless?"
            )

    # thread initialization
    self.__thread = None
    # initialize termination flag
    self.__terminate = Event()

read(self)

Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full.

Returns: A n-dimensional numpy array.

Source code in vidgear/gears/screengear.py
def read(self):
    """
    Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory,
    and blocks the thread if the deque is full.

    **Returns:** A n-dimensional numpy array.
    """
    # check whether or not termination flag is enabled
    while not self.__terminate.is_set():
        if self.__queue.empty():
            break
        return self.__queue.get(timeout=self.__thread_timeout)
    # otherwise return NoneType
    return None

start(self)

Launches the internal Threaded Frames Extractor daemon

Returns: A reference to the ScreenGear class object.

Source code in vidgear/gears/screengear.py
def start(self):
    """
    Launches the internal *Threaded Frames Extractor* daemon

    **Returns:** A reference to the ScreenGear class object.
    """
    self.__thread = Thread(target=self.__update, name="ScreenGear", args=())
    self.__thread.daemon = True
    self.__thread.start()
    return self

stop(self)

Safely terminates the thread, and release the resources.

Source code in vidgear/gears/screengear.py
def stop(self):
    """
    Safely terminates the thread, and release the resources.
    """
    self.__logging and logger.debug("Terminating ScreenGear Processes.")

    # indicate that the thread should be terminate
    self.__terminate.set()

    # wait until stream resources are released (producer thread might be still grabbing frame)
    if self.__thread is not None:
        if not (self.__queue is None):
            while not self.__queue.empty():
                try:
                    self.__queue.get_nowait()
                except queue.Empty:
                    continue
                self.__queue.task_done()
        self.__thread.join()

 


Last update: August 11, 2021