Skip to content

NetGear_Async API

NetGear_Async API usage examples can be found here ➶

NetGear_Async API parameters are explained here ➶

NetGear_Async can generate the same performance as NetGear API at about one-third the memory consumption, and also provide complete server-client handling with various options to use variable protocols/patterns similar to NetGear, but lacks in term of flexibility as it supports only a few NetGear's Exclusive Modes.

NetGear_Async is built on zmq.asyncio, and powered by a high-performance asyncio event loop called uvloop to achieve unwatchable high-speed and lag-free video streaming over the network with minimal resource constraints. NetGear_Async can transfer thousands of frames in just a few seconds without causing any significant load on your system.

NetGear_Async provides complete server-client handling and options to use variable protocols/patterns similar to NetGear API. Furthermore, NetGear_Async allows us to define our custom Server as source to transform frames easily before sending them across the network.

NetGear_Async now supports additional bidirectional data transmission between receiver(client) and sender(server) while transferring frames. Users can easily build complex applications such as like Real-Time Video Chat in just few lines of code.

In addition to all this, NetGear_Async API also provides internal wrapper around VideoGear, which itself provides internal access to both CamGear and PiGear APIs, thereby granting it exclusive power for transferring frames incoming from any source to the network.

NetGear_Async as of now supports four ZeroMQ messaging patterns:

  • zmq.PAIR (ZMQ Pair Pattern)
  • zmq.REQ/zmq.REP (ZMQ Request/Reply Pattern)
  • zmq.PUB/zmq.SUB (ZMQ Publish/Subscribe Pattern)
  • zmq.PUSH/zmq.PULL (ZMQ Push/Pull Pattern)

Whereas supported protocol are: tcp and ipc.

Source code in vidgear/gears/asyncio/netgear_async.py
class NetGear_Async:
    """
    NetGear_Async can generate the same performance as NetGear API at about one-third the memory consumption, and also provide complete server-client handling with various
    options to use variable protocols/patterns similar to NetGear, but lacks in term of flexibility as it supports only a few NetGear's Exclusive Modes.

    NetGear_Async is built on `zmq.asyncio`, and powered by a high-performance asyncio event loop called uvloop to achieve unwatchable high-speed and lag-free video streaming
    over the network with minimal resource constraints. NetGear_Async can transfer thousands of frames in just a few seconds without causing any significant load on your
    system.

    NetGear_Async provides complete server-client handling and options to use variable protocols/patterns similar to NetGear API. Furthermore, NetGear_Async allows us to define
     our custom Server as source to transform frames easily before sending them across the network.

    NetGear_Async now supports additional **bidirectional data transmission** between receiver(client) and sender(server) while transferring frames.
    Users can easily build complex applications such as like _Real-Time Video Chat_ in just few lines of code.

    In addition to all this, NetGear_Async API also provides internal wrapper around VideoGear, which itself provides internal access to both CamGear and PiGear APIs, thereby
    granting it exclusive power for transferring frames incoming from any source to the network.

    NetGear_Async as of now supports four ZeroMQ messaging patterns:

    - `zmq.PAIR` _(ZMQ Pair Pattern)_
    - `zmq.REQ/zmq.REP` _(ZMQ Request/Reply Pattern)_
    - `zmq.PUB/zmq.SUB` _(ZMQ Publish/Subscribe Pattern)_
    - `zmq.PUSH/zmq.PULL` _(ZMQ Push/Pull Pattern)_

    Whereas supported protocol are: `tcp` and `ipc`.
    """

    def __init__(
        self,
        # NetGear_Async parameters
        address=None,
        port=None,
        protocol="tcp",
        pattern=0,
        receive_mode=False,
        timeout=0.0,
        # Videogear parameters
        enablePiCamera=False,
        stabilize=False,
        source=None,
        camera_num=0,
        stream_mode=False,
        backend=0,
        colorspace=None,
        resolution=(640, 480),
        framerate=25,
        time_delay=0,
        # common parameters
        logging=False,
        **options
    ):

        """
        This constructor method initializes the object state and attributes of the NetGear_Async class.

        Parameters:
            address (str): sets the valid network address of the Server/Client.
            port (str): sets the valid Network Port of the Server/Client.
            protocol (str): sets the valid messaging protocol between Server/Client.
            pattern (int): sets the supported messaging pattern(flow of communication) between Server/Client
            receive_mode (bool): select the NetGear_Async's Mode of operation.
            timeout (int/float): controls the maximum waiting time(in sec) after which Client throws `TimeoutError`.
            enablePiCamera (bool): provide access to PiGear(if True) or CamGear(if False) APIs respectively.
            stabilize (bool): enable access to Stabilizer Class for stabilizing frames.
            camera_num (int): selects the camera module index which will be used as Rpi source.
            resolution (tuple): sets the resolution (i.e. `(width,height)`) of the Rpi source.
            framerate (int/float): sets the framerate of the Rpi source.
            source (based on input): defines the source for the input stream.
            stream_mode (bool): controls the exclusive YouTube Mode.
            backend (int): selects the backend for OpenCV's VideoCapture class.
            colorspace (str): selects the colorspace of the input stream.
            logging (bool): enables/disables logging.
            time_delay (int): time delay (in sec) before start reading the frames.
            options (dict): provides ability to alter Tweak Parameters of NetGear_Async, CamGear, PiGear & Stabilizer.
        """
        # raise error(s) for critical Class imports
        import_dependency_safe("zmq" if zmq is None else "", min_version="4.0")
        import_dependency_safe("msgpack" if msgpack is None else "")
        import_dependency_safe("msgpack_numpy" if m is None else "")

        # enable logging if specified
        self.__logging = logging

        # define valid messaging patterns => `0`: PAIR, `1`:(REQ, REP), `2`:(SUB, PUB), `3`:(PUSH, PULL)
        valid_messaging_patterns = {
            0: (zmq.PAIR, zmq.PAIR),
            1: (zmq.REQ, zmq.REP),
            2: (zmq.PUB, zmq.SUB),
            3: (zmq.PUSH, zmq.PULL),
        }

        # check whether user-defined messaging pattern is valid
        if isinstance(pattern, int) and pattern in valid_messaging_patterns:
            # assign value
            self.__msg_pattern = pattern
            self.__pattern = valid_messaging_patterns[pattern]
        else:
            # otherwise default to 0:`zmq.PAIR`
            self.__msg_pattern = 0
            self.__pattern = valid_messaging_patterns[self.__msg_pattern]
            if self.__logging:
                logger.warning(
                    "Invalid pattern {pattern}. Defaulting to `zmq.PAIR`!".format(
                        pattern=pattern
                    )
                )

        # check  whether user-defined messaging protocol is valid
        if isinstance(protocol, str) and protocol in ["tcp", "ipc"]:
            # assign value
            self.__protocol = protocol
        else:
            # else default to `tcp` protocol
            self.__protocol = "tcp"
            if self.__logging:
                logger.warning("Invalid protocol. Defaulting to `tcp`!")

        # initialize Termination flag
        self.__terminate = False
        # initialize and assign `Receive Mode`
        self.__receive_mode = receive_mode
        # initialize stream handler
        self.__stream = None
        # initialize Messaging Socket
        self.__msg_socket = None
        # initialize NetGear_Async's configuration dictionary
        self.config = {}
        # asyncio queue handler
        self.__queue = None
        # define Bidirectional mode
        self.__bi_mode = False  # handles Bidirectional mode state

        # assign timeout for Receiver end
        if timeout and isinstance(timeout, (int, float)):
            self.__timeout = float(timeout)
        else:
            self.__timeout = 15.0

        # generate 8-digit random system id
        self.__id = "".join(
            secrets.choice(string.ascii_uppercase + string.digits) for i in range(8)
        )

        # Handle user-defined options dictionary values
        # reformat dictionary
        options = {str(k).strip(): v for k, v in options.items()}
        # handle bidirectional mode
        if "bidirectional_mode" in options:
            value = options["bidirectional_mode"]
            # also check if pattern and source is valid
            if isinstance(value, bool) and pattern < 2 and source is None:
                # activate Bidirectional mode if specified
                self.__bi_mode = value
            else:
                # otherwise disable it
                self.__bi_mode = False
                logger.warning("Bidirectional data transmission is disabled!")
            # handle errors and logging
            if pattern >= 2:
                # raise error
                raise ValueError(
                    "[NetGear_Async:ERROR] :: `{}` pattern is not valid when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(
                        pattern
                    )
                )
            elif not (source is None):
                raise ValueError(
                    "[NetGear_Async:ERROR] :: Custom source must be used when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(
                        pattern
                    )
                )
            elif isinstance(value, bool) and self.__logging:
                # log Bidirectional mode activation
                logger.debug(
                    "Bidirectional Data Transmission is {} for this connection!".format(
                        "enabled" if value else "disabled"
                    )
                )
            else:
                logger.error("`bidirectional_mode` value is invalid!")
            # clean
            del options["bidirectional_mode"]

        # define messaging asynchronous Context
        self.__msg_context = zmq.asyncio.Context()

        # check whether `Receive Mode` is enabled
        if receive_mode:
            # assign local IP address if None
            if address is None:
                self.__address = "*"  # define address
            else:
                self.__address = address
            # assign default port address if None
            if port is None:
                self.__port = "5555"
            else:
                self.__port = port
        else:
            # Handle video source
            if source is None:
                self.config = {"generator": None}
                if self.__logging:
                    logger.warning("Given source is of NoneType!")
            else:
                # define stream with necessary params
                self.__stream = VideoGear(
                    enablePiCamera=enablePiCamera,
                    stabilize=stabilize,
                    source=source,
                    camera_num=camera_num,
                    stream_mode=stream_mode,
                    backend=backend,
                    colorspace=colorspace,
                    resolution=resolution,
                    framerate=framerate,
                    logging=logging,
                    time_delay=time_delay,
                    **options
                )
                # define default frame generator in configuration
                self.config = {"generator": self.__frame_generator()}
            # assign local ip address if None
            if address is None:
                self.__address = "localhost"
            else:
                self.__address = address
            # assign default port address if None
            if port is None:
                self.__port = "5555"
            else:
                self.__port = port
            # add server task handler
            self.task = None

        # Setup and assign event loop policy
        if platform.system() == "Windows":
            # On Windows, VidGear requires the ``WindowsSelectorEventLoop``, and this is
            # the default in Python 3.7 and older, but new Python 3.8, defaults to an
            # event loop that is not compatible with it. Thereby, we had to set it manually.
            if sys.version_info[:2] >= (3, 8):
                asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
        else:
            if not (uvloop is None):
                # Latest uvloop eventloop is only available for UNIX machines.
                asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
            else:
                # log if not present
                import_dependency_safe("uvloop", error="log")

        # Retrieve event loop and assign it
        self.loop = asyncio.get_event_loop()
        # create asyncio queue if bidirectional mode activated
        self.__queue = asyncio.Queue() if self.__bi_mode else None
        # log eventloop for debugging
        if self.__logging:
            # debugging
            logger.info(
                "Using `{}` event loop for this process.".format(
                    self.loop.__class__.__name__
                )
            )

    def launch(self):
        """
        Launches an asynchronous generators and loop executors for respective task.
        """
        # check if receive mode enabled
        if self.__receive_mode:
            if self.__logging:
                logger.debug("Launching NetGear_Async asynchronous generator!")
            # run loop executor for Receiver asynchronous generator
            self.loop.run_in_executor(None, self.recv_generator)
        else:
            # Otherwise launch Server handler
            if self.__logging:
                logger.debug("Creating NetGear_Async asynchronous server handler!")
            # create task for Server Handler
            self.task = asyncio.ensure_future(self.__server_handler())
        # return instance
        return self

    async def __server_handler(self):
        """
        Handles various Server-end processes/tasks.
        """
        # validate assigned frame generator in NetGear_Async configuration
        if isinstance(self.config, dict) and "generator" in self.config:
            # check if its  assigned value is a asynchronous generator
            if self.config["generator"] is None or not inspect.isasyncgen(
                self.config["generator"]
            ):
                # otherwise raise error
                raise ValueError(
                    "[NetGear_Async:ERROR] :: Invalid configuration. Assigned generator must be a asynchronous generator function/method only!"
                )
        else:
            # raise error if validation fails
            raise RuntimeError(
                "[NetGear_Async:ERROR] :: Assigned NetGear_Async configuration is invalid!"
            )

        # define our messaging socket
        self.__msg_socket = self.__msg_context.socket(self.__pattern[0])

        # if req/rep pattern, define additional flags
        if self.__msg_pattern == 1:
            self.__msg_socket.REQ_RELAXED = True
            self.__msg_socket.REQ_CORRELATE = True

        # if pub/sub pattern, define additional optimizer
        if self.__msg_pattern == 2:
            self.__msg_socket.set_hwm(1)

        # try connecting socket to assigned protocol, address and port
        try:
            self.__msg_socket.connect(
                self.__protocol + "://" + str(self.__address) + ":" + str(self.__port)
            )
            # finally log if successful
            if self.__logging:
                logger.debug(
                    "Successfully connected to address: {} with pattern: {}.".format(
                        (
                            self.__protocol
                            + "://"
                            + str(self.__address)
                            + ":"
                            + str(self.__port)
                        ),
                        self.__msg_pattern,
                    )
                )
            logger.critical(
                "Send Mode is successfully activated and ready to send data!"
            )
        except Exception as e:
            # log ad raise error if failed
            logger.exception(str(e))
            if self.__bi_mode:
                logger.error(
                    "Failed to activate Bidirectional Mode for this connection!"
                )
            raise ValueError(
                "[NetGear_Async:ERROR] :: Failed to connect address: {} and pattern: {}!".format(
                    (
                        self.__protocol
                        + "://"
                        + str(self.__address)
                        + ":"
                        + str(self.__port)
                    ),
                    self.__msg_pattern,
                )
            )

        # loop over our Asynchronous frame generator
        async for dataframe in self.config["generator"]:
            # extract data if bidirectional mode
            if self.__bi_mode and len(dataframe) == 2:
                (data, frame) = dataframe
                if not (data is None) and isinstance(data, np.ndarray):
                    logger.warning(
                        "Skipped unsupported `data` of datatype: {}!".format(
                            type(data).__name__
                        )
                    )
                    data = None
                assert isinstance(
                    frame, np.ndarray
                ), "[NetGear_Async:ERROR] :: Invalid data received from server end!"
            elif self.__bi_mode:
                # raise error for invalid data
                raise ValueError(
                    "[NetGear_Async:ERROR] :: Send Mode only accepts tuple(data, frame) as input in Bidirectional Mode. \
                    Kindly refer vidgear docs!"
                )
            else:
                # otherwise just make a copy of frame
                frame = np.copy(dataframe)
                data = None

            # check if retrieved frame is `CONTIGUOUS`
            if not (frame.flags["C_CONTIGUOUS"]):
                # otherwise make it
                frame = np.ascontiguousarray(frame, dtype=frame.dtype)

            # create data dict
            data_dict = dict(
                terminate=False,
                bi_mode=self.__bi_mode,
                data=data if not (data is None) else "",
            )
            # encode it
            data_enc = msgpack.packb(data_dict)
            # send the encoded data with correct flags
            await self.__msg_socket.send(data_enc, flags=zmq.SNDMORE)

            # encode frame
            frame_enc = msgpack.packb(frame, default=m.encode)
            # send the encoded frame
            await self.__msg_socket.send_multipart([frame_enc])

            # check if bidirectional patterns used
            if self.__msg_pattern < 2:
                # handle bidirectional data transfer if enabled
                if self.__bi_mode:
                    # get receiver encoded message withing timeout limit
                    recvdmsg_encoded = await asyncio.wait_for(
                        self.__msg_socket.recv(), timeout=self.__timeout
                    )
                    # retrieve receiver data from encoded message
                    recvd_data = msgpack.unpackb(recvdmsg_encoded, use_list=False)
                    # check message type
                    if recvd_data["return_type"] == "ndarray":  # numpy.ndarray
                        # get encoded frame from receiver
                        recvdframe_encoded = await asyncio.wait_for(
                            self.__msg_socket.recv_multipart(), timeout=self.__timeout
                        )
                        # retrieve frame and put in queue
                        await self.__queue.put(
                            msgpack.unpackb(
                                recvdframe_encoded[0],
                                use_list=False,
                                object_hook=m.decode,
                            )
                        )
                    else:
                        # otherwise put data directly in queue
                        await self.__queue.put(
                            recvd_data["return_data"]
                            if recvd_data["return_data"]
                            else None
                        )
                else:
                    # otherwise log received confirmation
                    recv_confirmation = await asyncio.wait_for(
                        self.__msg_socket.recv(), timeout=self.__timeout
                    )
                    if self.__logging:
                        logger.debug(recv_confirmation)

    async def recv_generator(self):
        """
        A default Asynchronous Frame Generator for NetGear_Async's Receiver-end.
        """
        # check whether `receive mode` is activated
        if not (self.__receive_mode):
            # raise Value error and exit
            self.__terminate = True
            raise ValueError(
                "[NetGear_Async:ERROR] :: `recv_generator()` function cannot be accessed while `receive_mode` is disabled. Kindly refer vidgear docs!"
            )

        # initialize and define messaging socket
        self.__msg_socket = self.__msg_context.socket(self.__pattern[1])

        # define exclusive socket options for patterns
        if self.__msg_pattern == 2:
            self.__msg_socket.set_hwm(1)
            self.__msg_socket.setsockopt(zmq.SUBSCRIBE, b"")

        try:
            # bind socket to the assigned protocol, address and port
            self.__msg_socket.bind(
                self.__protocol + "://" + str(self.__address) + ":" + str(self.__port)
            )
            # finally log progress
            if self.__logging:
                logger.debug(
                    "Successfully binded to address: {} with pattern: {}.".format(
                        (
                            self.__protocol
                            + "://"
                            + str(self.__address)
                            + ":"
                            + str(self.__port)
                        ),
                        self.__msg_pattern,
                    )
                )
            logger.critical("Receive Mode is activated successfully!")
        except Exception as e:
            logger.exception(str(e))
            raise RuntimeError(
                "[NetGear_Async:ERROR] :: Failed to bind address: {} and pattern: {}{}!".format(
                    (
                        self.__protocol
                        + "://"
                        + str(self.__address)
                        + ":"
                        + str(self.__port)
                    ),
                    self.__msg_pattern,
                    " and Bidirectional Mode enabled" if self.__bi_mode else "",
                )
            )

        # loop until terminated
        while not self.__terminate:
            # get encoded data message from server withing timeout limit
            datamsg_encoded = await asyncio.wait_for(
                self.__msg_socket.recv(), timeout=self.__timeout
            )
            # retrieve data from message
            data = msgpack.unpackb(datamsg_encoded, use_list=False)
            # terminate if exit` flag received from server
            if data["terminate"]:
                # send confirmation message to server if bidirectional patterns
                if self.__msg_pattern < 2:
                    # create termination confirmation message
                    return_dict = dict(
                        terminated="Client-`{}` successfully terminated!".format(
                            self.__id
                        ),
                    )
                    # encode message
                    retdata_enc = msgpack.packb(return_dict)
                    # send message back to server
                    await self.__msg_socket.send(retdata_enc)
                if self.__logging:
                    logger.info("Termination signal received from server!")
                # break loop and terminate
                self.__terminate = True
                break
            # get encoded frame message from server withing timeout limit
            framemsg_encoded = await asyncio.wait_for(
                self.__msg_socket.recv_multipart(), timeout=self.__timeout
            )
            # retrieve frame from message
            frame = msgpack.unpackb(
                framemsg_encoded[0], use_list=False, object_hook=m.decode
            )

            # check if bidirectional patterns
            if self.__msg_pattern < 2:
                # handle bidirectional data transfer if enabled
                if self.__bi_mode and data["bi_mode"]:
                    # handle empty queue
                    if not self.__queue.empty():
                        return_data = await self.__queue.get()
                        self.__queue.task_done()
                    else:
                        return_data = None
                    # check if we are returning `ndarray` frames
                    if not (return_data is None) and isinstance(
                        return_data, np.ndarray
                    ):
                        # check whether the incoming frame is contiguous
                        if not (return_data.flags["C_CONTIGUOUS"]):
                            return_data = np.ascontiguousarray(
                                return_data, dtype=return_data.dtype
                            )

                        # create return type dict without data
                        rettype_dict = dict(
                            return_type=(type(return_data).__name__),
                            return_data=None,
                        )
                        # encode it
                        rettype_enc = msgpack.packb(rettype_dict)
                        # send it to server with correct flags
                        await self.__msg_socket.send(rettype_enc, flags=zmq.SNDMORE)

                        # encode return ndarray data
                        retframe_enc = msgpack.packb(return_data, default=m.encode)
                        # send it over network to server
                        await self.__msg_socket.send_multipart([retframe_enc])
                    else:
                        # otherwise create type and data dict
                        return_dict = dict(
                            return_type=(type(return_data).__name__),
                            return_data=return_data
                            if not (return_data is None)
                            else "",
                        )
                        # encode it
                        retdata_enc = msgpack.packb(return_dict)
                        # send it over network to server
                        await self.__msg_socket.send(retdata_enc)
                elif self.__bi_mode or data["bi_mode"]:
                    # raise error if bidirectional mode is disabled at server or client but not both
                    raise RuntimeError(
                        "[NetGear_Async:ERROR] :: Invalid configuration! Bidirectional Mode is not activate on {} end.".format(
                            "client" if self.__bi_mode else "server"
                        )
                    )
                else:
                    # otherwise just send confirmation message to server
                    await self.__msg_socket.send(
                        bytes(
                            "Data received on client: {} !".format(self.__id), "utf-8"
                        )
                    )
            # yield received tuple(data-frame) if bidirectional mode or else just frame
            if self.__bi_mode:
                yield (data["data"], frame) if data["data"] else (None, frame)
            else:
                yield frame
            # sleep for sometime
            await asyncio.sleep(0)

    async def __frame_generator(self):
        """
        Returns a default frame-generator for NetGear_Async's Server Handler.
        """
        # start stream
        self.__stream.start()
        # loop over stream until its terminated
        while not self.__terminate:
            # read frames
            frame = self.__stream.read()
            # break if NoneType
            if frame is None:
                break
            # yield frame
            yield frame
            # sleep for sometime
            await asyncio.sleep(0)

    async def transceive_data(self, data=None):
        """
        Bidirectional Mode exclusive method to Transmit data _(in Receive mode)_ and Receive data _(in Send mode)_.

        Parameters:
            data (any): inputs data _(of any datatype)_ for sending back to Server.
        """
        recvd_data = None
        if not self.__terminate:
            if self.__bi_mode:
                if self.__receive_mode:
                    await self.__queue.put(data)
                else:
                    if not self.__queue.empty():
                        recvd_data = await self.__queue.get()
                        self.__queue.task_done()
            else:
                logger.error(
                    "`transceive_data()` function cannot be used when Bidirectional Mode is disabled."
                )
        return recvd_data

    async def __terminate_connection(self, disable_confirmation=False):
        """
        Internal asyncio method to safely terminate ZMQ connection and queues

        Parameters:
            disable_confirmation (boolean): Force disable termination confirmation from client in bidirectional patterns.
        """
        # log termination
        if self.__logging:
            logger.debug(
                "Terminating various {} Processes. Please wait.".format(
                    "Receive Mode" if self.__receive_mode else "Send Mode"
                )
            )

        # check whether `receive_mode` is enabled or not
        if self.__receive_mode:
            # indicate that process should be terminated
            self.__terminate = True
        else:
            # indicate that process should be terminated
            self.__terminate = True
            # terminate stream
            if not (self.__stream is None):
                self.__stream.stop()
            # signal `exit` flag for termination!
            data_dict = dict(terminate=True)
            data_enc = msgpack.packb(data_dict)
            await self.__msg_socket.send(data_enc)
            # check if bidirectional patterns
            if self.__msg_pattern < 2 and not disable_confirmation:
                # then receive and log confirmation
                recv_confirmation = await self.__msg_socket.recv()
                recvd_conf = msgpack.unpackb(recv_confirmation, use_list=False)
                if self.__logging and "terminated" in recvd_conf:
                    logger.debug(recvd_conf["terminated"])
        # close socket
        self.__msg_socket.setsockopt(zmq.LINGER, 0)
        self.__msg_socket.close()
        # handle asyncio queues in bidirectional mode
        if self.__bi_mode:
            # empty queue if not
            while not self.__queue.empty():
                try:
                    self.__queue.get_nowait()
                except asyncio.QueueEmpty:
                    continue
                self.__queue.task_done()
            # join queues
            await self.__queue.join()

        logger.critical(
            "{} successfully terminated!".format(
                "Receive Mode" if self.__receive_mode else "Send Mode"
            )
        )

    def close(self, skip_loop=False):
        """
        Terminates all NetGear_Async Asynchronous processes gracefully.

        Parameters:
            skip_loop (Boolean): (optional)used only if don't want to close eventloop(required in pytest).
        """
        # close event loop if specified
        if not (skip_loop):
            # close connection gracefully
            self.loop.run_until_complete(self.__terminate_connection())
            self.loop.close()
        else:
            # otherwise create a task
            asyncio.ensure_future(
                self.__terminate_connection(disable_confirmation=True)
            )

__init__(self, address=None, port=None, protocol='tcp', pattern=0, receive_mode=False, timeout=0.0, enablePiCamera=False, stabilize=False, source=None, camera_num=0, stream_mode=False, backend=0, colorspace=None, resolution=(640, 480), framerate=25, time_delay=0, logging=False, **options) special

This constructor method initializes the object state and attributes of the NetGear_Async class.

Parameters:

Name Type Description Default
address str

sets the valid network address of the Server/Client.

None
port str

sets the valid Network Port of the Server/Client.

None
protocol str

sets the valid messaging protocol between Server/Client.

'tcp'
pattern int

sets the supported messaging pattern(flow of communication) between Server/Client

0
receive_mode bool

select the NetGear_Async's Mode of operation.

False
timeout int/float

controls the maximum waiting time(in sec) after which Client throws TimeoutError.

0.0
enablePiCamera bool

provide access to PiGear(if True) or CamGear(if False) APIs respectively.

False
stabilize bool

enable access to Stabilizer Class for stabilizing frames.

False
camera_num int

selects the camera module index which will be used as Rpi source.

0
resolution tuple

sets the resolution (i.e. (width,height)) of the Rpi source.

(640, 480)
framerate int/float

sets the framerate of the Rpi source.

25
source based on input

defines the source for the input stream.

None
stream_mode bool

controls the exclusive YouTube Mode.

False
backend int

selects the backend for OpenCV's VideoCapture class.

0
colorspace str

selects the colorspace of the input stream.

None
logging bool

enables/disables logging.

False
time_delay int

time delay (in sec) before start reading the frames.

0
options dict

provides ability to alter Tweak Parameters of NetGear_Async, CamGear, PiGear & Stabilizer.

{}
Source code in vidgear/gears/asyncio/netgear_async.py
def __init__(
    self,
    # NetGear_Async parameters
    address=None,
    port=None,
    protocol="tcp",
    pattern=0,
    receive_mode=False,
    timeout=0.0,
    # Videogear parameters
    enablePiCamera=False,
    stabilize=False,
    source=None,
    camera_num=0,
    stream_mode=False,
    backend=0,
    colorspace=None,
    resolution=(640, 480),
    framerate=25,
    time_delay=0,
    # common parameters
    logging=False,
    **options
):

    """
    This constructor method initializes the object state and attributes of the NetGear_Async class.

    Parameters:
        address (str): sets the valid network address of the Server/Client.
        port (str): sets the valid Network Port of the Server/Client.
        protocol (str): sets the valid messaging protocol between Server/Client.
        pattern (int): sets the supported messaging pattern(flow of communication) between Server/Client
        receive_mode (bool): select the NetGear_Async's Mode of operation.
        timeout (int/float): controls the maximum waiting time(in sec) after which Client throws `TimeoutError`.
        enablePiCamera (bool): provide access to PiGear(if True) or CamGear(if False) APIs respectively.
        stabilize (bool): enable access to Stabilizer Class for stabilizing frames.
        camera_num (int): selects the camera module index which will be used as Rpi source.
        resolution (tuple): sets the resolution (i.e. `(width,height)`) of the Rpi source.
        framerate (int/float): sets the framerate of the Rpi source.
        source (based on input): defines the source for the input stream.
        stream_mode (bool): controls the exclusive YouTube Mode.
        backend (int): selects the backend for OpenCV's VideoCapture class.
        colorspace (str): selects the colorspace of the input stream.
        logging (bool): enables/disables logging.
        time_delay (int): time delay (in sec) before start reading the frames.
        options (dict): provides ability to alter Tweak Parameters of NetGear_Async, CamGear, PiGear & Stabilizer.
    """
    # raise error(s) for critical Class imports
    import_dependency_safe("zmq" if zmq is None else "", min_version="4.0")
    import_dependency_safe("msgpack" if msgpack is None else "")
    import_dependency_safe("msgpack_numpy" if m is None else "")

    # enable logging if specified
    self.__logging = logging

    # define valid messaging patterns => `0`: PAIR, `1`:(REQ, REP), `2`:(SUB, PUB), `3`:(PUSH, PULL)
    valid_messaging_patterns = {
        0: (zmq.PAIR, zmq.PAIR),
        1: (zmq.REQ, zmq.REP),
        2: (zmq.PUB, zmq.SUB),
        3: (zmq.PUSH, zmq.PULL),
    }

    # check whether user-defined messaging pattern is valid
    if isinstance(pattern, int) and pattern in valid_messaging_patterns:
        # assign value
        self.__msg_pattern = pattern
        self.__pattern = valid_messaging_patterns[pattern]
    else:
        # otherwise default to 0:`zmq.PAIR`
        self.__msg_pattern = 0
        self.__pattern = valid_messaging_patterns[self.__msg_pattern]
        if self.__logging:
            logger.warning(
                "Invalid pattern {pattern}. Defaulting to `zmq.PAIR`!".format(
                    pattern=pattern
                )
            )

    # check  whether user-defined messaging protocol is valid
    if isinstance(protocol, str) and protocol in ["tcp", "ipc"]:
        # assign value
        self.__protocol = protocol
    else:
        # else default to `tcp` protocol
        self.__protocol = "tcp"
        if self.__logging:
            logger.warning("Invalid protocol. Defaulting to `tcp`!")

    # initialize Termination flag
    self.__terminate = False
    # initialize and assign `Receive Mode`
    self.__receive_mode = receive_mode
    # initialize stream handler
    self.__stream = None
    # initialize Messaging Socket
    self.__msg_socket = None
    # initialize NetGear_Async's configuration dictionary
    self.config = {}
    # asyncio queue handler
    self.__queue = None
    # define Bidirectional mode
    self.__bi_mode = False  # handles Bidirectional mode state

    # assign timeout for Receiver end
    if timeout and isinstance(timeout, (int, float)):
        self.__timeout = float(timeout)
    else:
        self.__timeout = 15.0

    # generate 8-digit random system id
    self.__id = "".join(
        secrets.choice(string.ascii_uppercase + string.digits) for i in range(8)
    )

    # Handle user-defined options dictionary values
    # reformat dictionary
    options = {str(k).strip(): v for k, v in options.items()}
    # handle bidirectional mode
    if "bidirectional_mode" in options:
        value = options["bidirectional_mode"]
        # also check if pattern and source is valid
        if isinstance(value, bool) and pattern < 2 and source is None:
            # activate Bidirectional mode if specified
            self.__bi_mode = value
        else:
            # otherwise disable it
            self.__bi_mode = False
            logger.warning("Bidirectional data transmission is disabled!")
        # handle errors and logging
        if pattern >= 2:
            # raise error
            raise ValueError(
                "[NetGear_Async:ERROR] :: `{}` pattern is not valid when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(
                    pattern
                )
            )
        elif not (source is None):
            raise ValueError(
                "[NetGear_Async:ERROR] :: Custom source must be used when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(
                    pattern
                )
            )
        elif isinstance(value, bool) and self.__logging:
            # log Bidirectional mode activation
            logger.debug(
                "Bidirectional Data Transmission is {} for this connection!".format(
                    "enabled" if value else "disabled"
                )
            )
        else:
            logger.error("`bidirectional_mode` value is invalid!")
        # clean
        del options["bidirectional_mode"]

    # define messaging asynchronous Context
    self.__msg_context = zmq.asyncio.Context()

    # check whether `Receive Mode` is enabled
    if receive_mode:
        # assign local IP address if None
        if address is None:
            self.__address = "*"  # define address
        else:
            self.__address = address
        # assign default port address if None
        if port is None:
            self.__port = "5555"
        else:
            self.__port = port
    else:
        # Handle video source
        if source is None:
            self.config = {"generator": None}
            if self.__logging:
                logger.warning("Given source is of NoneType!")
        else:
            # define stream with necessary params
            self.__stream = VideoGear(
                enablePiCamera=enablePiCamera,
                stabilize=stabilize,
                source=source,
                camera_num=camera_num,
                stream_mode=stream_mode,
                backend=backend,
                colorspace=colorspace,
                resolution=resolution,
                framerate=framerate,
                logging=logging,
                time_delay=time_delay,
                **options
            )
            # define default frame generator in configuration
            self.config = {"generator": self.__frame_generator()}
        # assign local ip address if None
        if address is None:
            self.__address = "localhost"
        else:
            self.__address = address
        # assign default port address if None
        if port is None:
            self.__port = "5555"
        else:
            self.__port = port
        # add server task handler
        self.task = None

    # Setup and assign event loop policy
    if platform.system() == "Windows":
        # On Windows, VidGear requires the ``WindowsSelectorEventLoop``, and this is
        # the default in Python 3.7 and older, but new Python 3.8, defaults to an
        # event loop that is not compatible with it. Thereby, we had to set it manually.
        if sys.version_info[:2] >= (3, 8):
            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    else:
        if not (uvloop is None):
            # Latest uvloop eventloop is only available for UNIX machines.
            asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
        else:
            # log if not present
            import_dependency_safe("uvloop", error="log")

    # Retrieve event loop and assign it
    self.loop = asyncio.get_event_loop()
    # create asyncio queue if bidirectional mode activated
    self.__queue = asyncio.Queue() if self.__bi_mode else None
    # log eventloop for debugging
    if self.__logging:
        # debugging
        logger.info(
            "Using `{}` event loop for this process.".format(
                self.loop.__class__.__name__
            )
        )

close(self, skip_loop=False)

Terminates all NetGear_Async Asynchronous processes gracefully.

Parameters:

Name Type Description Default
skip_loop Boolean

(optional)used only if don't want to close eventloop(required in pytest).

False
Source code in vidgear/gears/asyncio/netgear_async.py
def close(self, skip_loop=False):
    """
    Terminates all NetGear_Async Asynchronous processes gracefully.

    Parameters:
        skip_loop (Boolean): (optional)used only if don't want to close eventloop(required in pytest).
    """
    # close event loop if specified
    if not (skip_loop):
        # close connection gracefully
        self.loop.run_until_complete(self.__terminate_connection())
        self.loop.close()
    else:
        # otherwise create a task
        asyncio.ensure_future(
            self.__terminate_connection(disable_confirmation=True)
        )

launch(self)

Launches an asynchronous generators and loop executors for respective task.

Source code in vidgear/gears/asyncio/netgear_async.py
def launch(self):
    """
    Launches an asynchronous generators and loop executors for respective task.
    """
    # check if receive mode enabled
    if self.__receive_mode:
        if self.__logging:
            logger.debug("Launching NetGear_Async asynchronous generator!")
        # run loop executor for Receiver asynchronous generator
        self.loop.run_in_executor(None, self.recv_generator)
    else:
        # Otherwise launch Server handler
        if self.__logging:
            logger.debug("Creating NetGear_Async asynchronous server handler!")
        # create task for Server Handler
        self.task = asyncio.ensure_future(self.__server_handler())
    # return instance
    return self

recv_generator(self)

A default Asynchronous Frame Generator for NetGear_Async's Receiver-end.

Source code in vidgear/gears/asyncio/netgear_async.py
async def recv_generator(self):
    """
    A default Asynchronous Frame Generator for NetGear_Async's Receiver-end.
    """
    # check whether `receive mode` is activated
    if not (self.__receive_mode):
        # raise Value error and exit
        self.__terminate = True
        raise ValueError(
            "[NetGear_Async:ERROR] :: `recv_generator()` function cannot be accessed while `receive_mode` is disabled. Kindly refer vidgear docs!"
        )

    # initialize and define messaging socket
    self.__msg_socket = self.__msg_context.socket(self.__pattern[1])

    # define exclusive socket options for patterns
    if self.__msg_pattern == 2:
        self.__msg_socket.set_hwm(1)
        self.__msg_socket.setsockopt(zmq.SUBSCRIBE, b"")

    try:
        # bind socket to the assigned protocol, address and port
        self.__msg_socket.bind(
            self.__protocol + "://" + str(self.__address) + ":" + str(self.__port)
        )
        # finally log progress
        if self.__logging:
            logger.debug(
                "Successfully binded to address: {} with pattern: {}.".format(
                    (
                        self.__protocol
                        + "://"
                        + str(self.__address)
                        + ":"
                        + str(self.__port)
                    ),
                    self.__msg_pattern,
                )
            )
        logger.critical("Receive Mode is activated successfully!")
    except Exception as e:
        logger.exception(str(e))
        raise RuntimeError(
            "[NetGear_Async:ERROR] :: Failed to bind address: {} and pattern: {}{}!".format(
                (
                    self.__protocol
                    + "://"
                    + str(self.__address)
                    + ":"
                    + str(self.__port)
                ),
                self.__msg_pattern,
                " and Bidirectional Mode enabled" if self.__bi_mode else "",
            )
        )

    # loop until terminated
    while not self.__terminate:
        # get encoded data message from server withing timeout limit
        datamsg_encoded = await asyncio.wait_for(
            self.__msg_socket.recv(), timeout=self.__timeout
        )
        # retrieve data from message
        data = msgpack.unpackb(datamsg_encoded, use_list=False)
        # terminate if exit` flag received from server
        if data["terminate"]:
            # send confirmation message to server if bidirectional patterns
            if self.__msg_pattern < 2:
                # create termination confirmation message
                return_dict = dict(
                    terminated="Client-`{}` successfully terminated!".format(
                        self.__id
                    ),
                )
                # encode message
                retdata_enc = msgpack.packb(return_dict)
                # send message back to server
                await self.__msg_socket.send(retdata_enc)
            if self.__logging:
                logger.info("Termination signal received from server!")
            # break loop and terminate
            self.__terminate = True
            break
        # get encoded frame message from server withing timeout limit
        framemsg_encoded = await asyncio.wait_for(
            self.__msg_socket.recv_multipart(), timeout=self.__timeout
        )
        # retrieve frame from message
        frame = msgpack.unpackb(
            framemsg_encoded[0], use_list=False, object_hook=m.decode
        )

        # check if bidirectional patterns
        if self.__msg_pattern < 2:
            # handle bidirectional data transfer if enabled
            if self.__bi_mode and data["bi_mode"]:
                # handle empty queue
                if not self.__queue.empty():
                    return_data = await self.__queue.get()
                    self.__queue.task_done()
                else:
                    return_data = None
                # check if we are returning `ndarray` frames
                if not (return_data is None) and isinstance(
                    return_data, np.ndarray
                ):
                    # check whether the incoming frame is contiguous
                    if not (return_data.flags["C_CONTIGUOUS"]):
                        return_data = np.ascontiguousarray(
                            return_data, dtype=return_data.dtype
                        )

                    # create return type dict without data
                    rettype_dict = dict(
                        return_type=(type(return_data).__name__),
                        return_data=None,
                    )
                    # encode it
                    rettype_enc = msgpack.packb(rettype_dict)
                    # send it to server with correct flags
                    await self.__msg_socket.send(rettype_enc, flags=zmq.SNDMORE)

                    # encode return ndarray data
                    retframe_enc = msgpack.packb(return_data, default=m.encode)
                    # send it over network to server
                    await self.__msg_socket.send_multipart([retframe_enc])
                else:
                    # otherwise create type and data dict
                    return_dict = dict(
                        return_type=(type(return_data).__name__),
                        return_data=return_data
                        if not (return_data is None)
                        else "",
                    )
                    # encode it
                    retdata_enc = msgpack.packb(return_dict)
                    # send it over network to server
                    await self.__msg_socket.send(retdata_enc)
            elif self.__bi_mode or data["bi_mode"]:
                # raise error if bidirectional mode is disabled at server or client but not both
                raise RuntimeError(
                    "[NetGear_Async:ERROR] :: Invalid configuration! Bidirectional Mode is not activate on {} end.".format(
                        "client" if self.__bi_mode else "server"
                    )
                )
            else:
                # otherwise just send confirmation message to server
                await self.__msg_socket.send(
                    bytes(
                        "Data received on client: {} !".format(self.__id), "utf-8"
                    )
                )
        # yield received tuple(data-frame) if bidirectional mode or else just frame
        if self.__bi_mode:
            yield (data["data"], frame) if data["data"] else (None, frame)
        else:
            yield frame
        # sleep for sometime
        await asyncio.sleep(0)

transceive_data(self, data=None) async

Bidirectional Mode exclusive method to Transmit data (in Receive mode) and Receive data (in Send mode).

Parameters:

Name Type Description Default
data any

inputs data (of any datatype) for sending back to Server.

None
Source code in vidgear/gears/asyncio/netgear_async.py
async def transceive_data(self, data=None):
    """
    Bidirectional Mode exclusive method to Transmit data _(in Receive mode)_ and Receive data _(in Send mode)_.

    Parameters:
        data (any): inputs data _(of any datatype)_ for sending back to Server.
    """
    recvd_data = None
    if not self.__terminate:
        if self.__bi_mode:
            if self.__receive_mode:
                await self.__queue.put(data)
            else:
                if not self.__queue.empty():
                    recvd_data = await self.__queue.get()
                    self.__queue.task_done()
        else:
            logger.error(
                "`transceive_data()` function cannot be used when Bidirectional Mode is disabled."
            )
    return recvd_data

 


Last update: August 11, 2021