NetGear_Async API usage examples can be found here ➶
NetGear_Async API parameters are explained here ➶
NetGear_Async can generate the same performance as NetGear API at about one-third the memory consumption, and also provide complete server-client handling with various options to use variable protocols/patterns similar to NetGear, but lacks in term of flexibility as it supports only a few NetGear's Exclusive Modes.
NetGear_Async is built on zmq.asyncio, and powered by a high-performance asyncio event loop called uvloop to achieve unwatchable high-speed and lag-free video streaming over the network with minimal resource constraints. NetGear_Async can transfer thousands of frames in just a few seconds without causing any significant load on your system.
NetGear_Async provides complete server-client handling and options to use variable protocols/patterns similar to NetGear API. Furthermore, NetGear_Async allows us to define our custom Server as source to transform frames easily before sending them across the network.
NetGear_Async now supports additional bidirectional data transmission between receiver(client) and sender(server) while transferring frames. Users can easily build complex applications such as like Real-Time Video Chat in just few lines of code.
In addition to all this, NetGear_Async API also provides internal wrapper around VideoGear, which itself provides internal access to both CamGear and PiGear APIs, thereby granting it exclusive power for transferring frames incoming from any source to the network.
NetGear_Async as of now supports four ZeroMQ messaging patterns:
zmq.PAIR(ZMQ Pair Pattern)
zmq.REQ/zmq.REP(ZMQ Request/Reply Pattern)
zmq.PUB/zmq.SUB(ZMQ Publish/Subscribe Pattern)
zmq.PUSH/zmq.PULL(ZMQ Push/Pull Pattern)
Whereas supported protocol are: tcp and ipc.
Source code in vidgear/gears/asyncio/netgear_async.py
classNetGear_Async:""" NetGear_Async can generate the same performance as NetGear API at about one-third the memory consumption, and also provide complete server-client handling with various options to use variable protocols/patterns similar to NetGear, but lacks in term of flexibility as it supports only a few NetGear's Exclusive Modes. NetGear_Async is built on `zmq.asyncio`, and powered by a high-performance asyncio event loop called uvloop to achieve unwatchable high-speed and lag-free video streaming over the network with minimal resource constraints. NetGear_Async can transfer thousands of frames in just a few seconds without causing any significant load on your system. NetGear_Async provides complete server-client handling and options to use variable protocols/patterns similar to NetGear API. Furthermore, NetGear_Async allows us to define our custom Server as source to transform frames easily before sending them across the network. NetGear_Async now supports additional **bidirectional data transmission** between receiver(client) and sender(server) while transferring frames. Users can easily build complex applications such as like _Real-Time Video Chat_ in just few lines of code. In addition to all this, NetGear_Async API also provides internal wrapper around VideoGear, which itself provides internal access to both CamGear and PiGear APIs, thereby granting it exclusive power for transferring frames incoming from any source to the network. NetGear_Async as of now supports four ZeroMQ messaging patterns: - `zmq.PAIR` _(ZMQ Pair Pattern)_ - `zmq.REQ/zmq.REP` _(ZMQ Request/Reply Pattern)_ - `zmq.PUB/zmq.SUB` _(ZMQ Publish/Subscribe Pattern)_ - `zmq.PUSH/zmq.PULL` _(ZMQ Push/Pull Pattern)_ Whereas supported protocol are: `tcp` and `ipc`. """def__init__(self,# NetGear_Async parametersaddress:str=None,port:str=None,protocol:str="tcp",pattern:int=0,receive_mode:bool=False,timeout:Union[int,float]=0.0,# Videogear parametersenablePiCamera:bool=False,stabilize:bool=False,source:Any=None,camera_num:int=0,stream_mode:bool=False,backend:int=0,colorspace:str=None,resolution:Tuple[int,int]=(640,480),framerate:Union[int,float]=25,time_delay:int=0,# common parameterslogging:bool=False,**options:dict):""" This constructor method initializes the object state and attributes of the NetGear_Async class. Parameters: address (str): sets the valid network address of the Server/Client. port (str): sets the valid Network Port of the Server/Client. protocol (str): sets the valid messaging protocol between Server/Client. pattern (int): sets the supported messaging pattern(flow of communication) between Server/Client receive_mode (bool): select the NetGear_Async's Mode of operation. timeout (int/float): controls the maximum waiting time(in sec) after which Client throws `TimeoutError`. enablePiCamera (bool): provide access to PiGear(if True) or CamGear(if False) APIs respectively. stabilize (bool): enable access to Stabilizer Class for stabilizing frames. camera_num (int): selects the camera module index which will be used as Rpi source. resolution (tuple): sets the resolution (i.e. `(width,height)`) of the Rpi source. framerate (int/float): sets the framerate of the Rpi source. source (based on input): defines the source for the input stream. stream_mode (bool): controls the exclusive YouTube Mode. backend (int): selects the backend for OpenCV's VideoCapture class. colorspace (str): selects the colorspace of the input stream. logging (bool): enables/disables logging. time_delay (int): time delay (in sec) before start reading the frames. options (dict): provides ability to alter Tweak Parameters of NetGear_Async, CamGear, PiGear & Stabilizer. """# enable logging if specifiedself.__logging=loggingifisinstance(logging,bool)elseFalse# print current versionlogcurr_vidgear_ver(logging=self.__logging)# raise error(s) for critical Class importsimport_dependency_safe("zmq"ifzmqisNoneelse"",min_version="4.0",pkg_name="pyzmq")import_dependency_safe("msgpack"ifmsgpackisNoneelse"")import_dependency_safe("msgpack_numpy"ifmisNoneelse"")# define valid messaging patterns => `0`: PAIR, `1`:(REQ, REP), `2`:(SUB, PUB), `3`:(PUSH, PULL)valid_messaging_patterns={0:(zmq.PAIR,zmq.PAIR),1:(zmq.REQ,zmq.REP),2:(zmq.PUB,zmq.SUB),3:(zmq.PUSH,zmq.PULL),}# check whether user-defined messaging pattern is validifisinstance(pattern,int)andpatterninvalid_messaging_patterns:# assign valueself.__msg_pattern=patternself.__pattern=valid_messaging_patterns[pattern]else:# otherwise default to 0:`zmq.PAIR`self.__msg_pattern=0self.__pattern=valid_messaging_patterns[self.__msg_pattern]self.__loggingandlogger.warning("Invalid pattern {pattern}. Defaulting to `zmq.PAIR`!".format(pattern=pattern))# check whether user-defined messaging protocol is validifisinstance(protocol,str)andprotocolin["tcp","ipc"]:# assign valueself.__protocol=protocolelse:# else default to `tcp` protocolself.__protocol="tcp"self.__loggingandlogger.warning("Invalid protocol. Defaulting to `tcp`!")# initialize Termination flagself.__terminate=False# initialize and assign `Receive Mode`self.__receive_mode=receive_mode# initialize stream handlerself.__stream=None# initialize Messaging Socketself.__msg_socket=None# initialize NetGear_Async's configuration dictionaryself.config={}# asyncio queue handlerself.__queue=None# define Bidirectional modeself.__bi_mode=False# handles Bidirectional mode state# assign timeout for Receiver endiftimeoutandisinstance(timeout,(int,float)):self.__timeout=float(timeout)else:self.__timeout=15.0# generate 8-digit random system idself.__id="".join(secrets.choice(string.ascii_uppercase+string.digits)foriinrange(8))# Handle user-defined options dictionary values# reformat dictionaryoptions={str(k).strip():vfork,vinoptions.items()}# handle bidirectional modeif"bidirectional_mode"inoptions:value=options["bidirectional_mode"]# also check if pattern and source is validifisinstance(value,bool)andpattern<2andsourceisNone:# activate Bidirectional mode if specifiedself.__bi_mode=valueelse:# otherwise disable itself.__bi_mode=Falselogger.warning("Bidirectional data transmission is disabled!")# handle errors and loggingifpattern>=2:# raise errorraiseValueError("[NetGear_Async:ERROR] :: `{}` pattern is not valid when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(pattern))elifnot(sourceisNone):raiseValueError("[NetGear_Async:ERROR] :: Custom source must be used when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(pattern))elifisinstance(value,bool)andself.__logging:# log Bidirectional mode activationlogger.debug("Bidirectional Data Transmission is {} for this connection!".format("enabled"ifvalueelse"disabled"))else:logger.error("`bidirectional_mode` value is invalid!")# cleandeloptions["bidirectional_mode"]# Setup and assign event loop policyifplatform.system()=="Windows":# On Windows, VidGear requires the ``WindowsSelectorEventLoop``, but Python 3.8 and above,# defaults to an ``ProactorEventLoop`` loop that is not compatible with it. Thereby,# we had to set it manually.asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())else:ifnot(uvloopisNone):# Latest uvloop eventloop is only available for UNIX machines.asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())else:# log if not presentimport_dependency_safe("uvloop",error="log")# Retrieve event loop and assign ittry:self.loop=asyncio.get_running_loop()exceptRuntimeError:# otherwise create onelogger.critical("No running event loop found. Creating a new one.")self.loop=asyncio.new_event_loop()# log eventloop for debuggingself.__loggingandlogger.info("Using ``{}`` event loop for this process.".format(self.loop.__class__.__name__))# define messaging asynchronous Contextself.__msg_context=zmq.asyncio.Context()# check whether `Receive Mode` is enabledifreceive_mode:# assign local IP address if NoneifaddressisNone:self.__address="*"# define addresselse:self.__address=address# assign default port address if NoneifportisNone:self.__port="5555"else:self.__port=portelse:# Handle video sourceifsourceisNone:self.config={"generator":None}self.__loggingandlogger.warning("Given source is of NoneType!")else:# define stream with necessary paramsself.__stream=VideoGear(enablePiCamera=enablePiCamera,stabilize=stabilize,source=source,camera_num=camera_num,stream_mode=stream_mode,backend=backend,colorspace=colorspace,resolution=resolution,framerate=framerate,logging=logging,time_delay=time_delay,**options)# define default frame generator in configurationself.config={"generator":self.__frame_generator()}# assign local ip address if NoneifaddressisNone:self.__address="localhost"else:self.__address=address# assign default port address if NoneifportisNone:self.__port="5555"else:self.__port=port# add server task handlerself.task=None# create asyncio queue if bidirectional mode activatedself.__queue=asyncio.Queue()ifself.__bi_modeelseNonedeflaunch(self)->T:""" Launches an asynchronous generators and loop executors for respective task. """# check if receive mode enabledifself.__receive_mode:self.__loggingandlogger.debug("Launching NetGear_Async asynchronous generator!")# run loop executor for Receiver asynchronous generatorself.loop.run_in_executor(None,self.recv_generator)else:# Otherwise launch Server handlerself.__loggingandlogger.debug("Creating NetGear_Async asynchronous server handler!")# create task for Server Handlerself.task=self.loop.create_task(self.__server_handler())# return instancereturnselfasyncdef__server_handler(self):""" Handles various Server-end processes/tasks. """# validate assigned frame generator in NetGear_Async configurationifisinstance(self.config,dict)and"generator"inself.config:# check if its assigned value is a asynchronous generatorifself.config["generator"]isNoneornotinspect.isasyncgen(self.config["generator"]):# otherwise raise errorraiseValueError("[NetGear_Async:ERROR] :: Invalid configuration. Assigned generator must be a asynchronous generator function/method only!")else:# raise error if validation failsraiseRuntimeError("[NetGear_Async:ERROR] :: Assigned NetGear_Async configuration is invalid!")# define our messaging socketself.__msg_socket=self.__msg_context.socket(self.__pattern[0])# if req/rep pattern, define additional flagsifself.__msg_pattern==1:self.__msg_socket.REQ_RELAXED=Trueself.__msg_socket.REQ_CORRELATE=True# if pub/sub pattern, define additional optimizerifself.__msg_pattern==2:self.__msg_socket.set_hwm(1)# try connecting socket to assigned protocol, address and porttry:self.__msg_socket.connect(self.__protocol+"://"+str(self.__address)+":"+str(self.__port))# finally log if successfulself.__loggingandlogger.debug("Successfully connected to address: {} with pattern: {}.".format((self.__protocol+"://"+str(self.__address)+":"+str(self.__port)),self.__msg_pattern,))logger.critical("Send Mode is successfully activated and ready to send data!")exceptExceptionase:# log ad raise error if failedlogger.exception(str(e))ifself.__bi_mode:logger.error("Failed to activate Bidirectional Mode for this connection!")raiseValueError("[NetGear_Async:ERROR] :: Failed to connect address: {} and pattern: {}!".format((self.__protocol+"://"+str(self.__address)+":"+str(self.__port)),self.__msg_pattern,))# loop over our Asynchronous frame generatorasyncfordataframeinself.config["generator"]:# extract data if bidirectional modeifself.__bi_modeandlen(dataframe)==2:(data,frame)=dataframeifnot(dataisNone)andisinstance(data,np.ndarray):logger.warning("Skipped unsupported `data` of datatype: {}!".format(type(data).__name__))data=Noneassertisinstance(frame,np.ndarray),"[NetGear_Async:ERROR] :: Invalid data received from server end!"elifself.__bi_mode:# raise error for invalid dataraiseValueError("[NetGear_Async:ERROR] :: Send Mode only accepts tuple(data, frame) as input in Bidirectional Mode. \ Kindly refer vidgear docs!")else:# otherwise just make a copy of frameframe=np.copy(dataframe)data=None# check if retrieved frame is `CONTIGUOUS`ifnot(frame.flags["C_CONTIGUOUS"]):# otherwise make itframe=np.ascontiguousarray(frame,dtype=frame.dtype)# create data dictdata_dict=dict(terminate=False,bi_mode=self.__bi_mode,data=dataifnot(dataisNone)else"",)# encode itdata_enc=msgpack.packb(data_dict)# send the encoded data with correct flagsawaitself.__msg_socket.send(data_enc,flags=zmq.SNDMORE)# encode frameframe_enc=msgpack.packb(frame,default=m.encode)# send the encoded frameawaitself.__msg_socket.send_multipart([frame_enc])# check if bidirectional patterns usedifself.__msg_pattern<2:# handle bidirectional data transfer if enabledifself.__bi_mode:# get receiver encoded message withing timeout limitrecvdmsg_encoded=awaitasyncio.wait_for(self.__msg_socket.recv(),timeout=self.__timeout)# retrieve receiver data from encoded messagerecvd_data=msgpack.unpackb(recvdmsg_encoded,use_list=False)# check message typeifrecvd_data["return_type"]=="ndarray":# numpy.ndarray# get encoded frame from receiverrecvdframe_encoded=awaitasyncio.wait_for(self.__msg_socket.recv_multipart(),timeout=self.__timeout)# retrieve frame and put in queueawaitself.__queue.put(msgpack.unpackb(recvdframe_encoded[0],use_list=False,object_hook=m.decode,))else:# otherwise put data directly in queueawaitself.__queue.put(recvd_data["return_data"]ifrecvd_data["return_data"]elseNone)else:# otherwise log received confirmationrecv_confirmation=awaitasyncio.wait_for(self.__msg_socket.recv(),timeout=self.__timeout)self.__loggingandlogger.debug(recv_confirmation)asyncdefrecv_generator(self)->AsyncGenerator[Tuple[Any,NDArray],NDArray]:""" A default Asynchronous Frame Generator for NetGear_Async's Receiver-end. """# check whether `receive mode` is activatedifnot(self.__receive_mode):# raise Value error and exitself.__terminate=TrueraiseValueError("[NetGear_Async:ERROR] :: `recv_generator()` function cannot be accessed while `receive_mode` is disabled. Kindly refer vidgear docs!")# initialize and define messaging socketself.__msg_socket=self.__msg_context.socket(self.__pattern[1])# define exclusive socket options for patternsifself.__msg_pattern==2:self.__msg_socket.set_hwm(1)self.__msg_socket.setsockopt(zmq.SUBSCRIBE,b"")try:# bind socket to the assigned protocol, address and portself.__msg_socket.bind(self.__protocol+"://"+str(self.__address)+":"+str(self.__port))# finally log progressself.__loggingandlogger.debug("Successfully binded to address: {} with pattern: {}.".format((self.__protocol+"://"+str(self.__address)+":"+str(self.__port)),self.__msg_pattern,))logger.critical("Receive Mode is activated successfully!")exceptExceptionase:logger.exception(str(e))raiseRuntimeError("[NetGear_Async:ERROR] :: Failed to bind address: {} and pattern: {}{}!".format((self.__protocol+"://"+str(self.__address)+":"+str(self.__port)),self.__msg_pattern," and Bidirectional Mode enabled"ifself.__bi_modeelse"",))# loop until terminatedwhilenotself.__terminate:# get encoded data message from server withing timeout limitdatamsg_encoded=awaitasyncio.wait_for(self.__msg_socket.recv(),timeout=self.__timeout)# retrieve data from messagedata=msgpack.unpackb(datamsg_encoded,use_list=False)# terminate if exit` flag received from serverifdata["terminate"]:# send confirmation message to server if bidirectional patternsifself.__msg_pattern<2:# create termination confirmation messagereturn_dict=dict(terminated="Client-`{}` successfully terminated!".format(self.__id),)# encode messageretdata_enc=msgpack.packb(return_dict)# send message back to serverawaitself.__msg_socket.send(retdata_enc)self.__loggingandlogger.info("Termination signal received from server!")# break loop and terminateself.__terminate=Truebreak# get encoded frame message from server withing timeout limitframemsg_encoded=awaitasyncio.wait_for(self.__msg_socket.recv_multipart(),timeout=self.__timeout)# retrieve frame from messageframe=msgpack.unpackb(framemsg_encoded[0],use_list=False,object_hook=m.decode)# check if bidirectional patternsifself.__msg_pattern<2:# handle bidirectional data transfer if enabledifself.__bi_modeanddata["bi_mode"]:# handle empty queueifnotself.__queue.empty():return_data=awaitself.__queue.get()self.__queue.task_done()else:return_data=None# check if we are returning `ndarray` framesifnot(return_dataisNone)andisinstance(return_data,np.ndarray):# check whether the incoming frame is contiguousifnot(return_data.flags["C_CONTIGUOUS"]):return_data=np.ascontiguousarray(return_data,dtype=return_data.dtype)# create return type dict without datarettype_dict=dict(return_type=(type(return_data).__name__),return_data=None,)# encode itrettype_enc=msgpack.packb(rettype_dict)# send it to server with correct flagsawaitself.__msg_socket.send(rettype_enc,flags=zmq.SNDMORE)# encode return ndarray dataretframe_enc=msgpack.packb(return_data,default=m.encode)# send it over network to serverawaitself.__msg_socket.send_multipart([retframe_enc])else:# otherwise create type and data dictreturn_dict=dict(return_type=(type(return_data).__name__),return_data=(return_dataifnot(return_dataisNone)else""),)# encode itretdata_enc=msgpack.packb(return_dict)# send it over network to serverawaitself.__msg_socket.send(retdata_enc)elifself.__bi_modeordata["bi_mode"]:# raise error if bidirectional mode is disabled at server or client but not bothraiseRuntimeError("[NetGear_Async:ERROR] :: Invalid configuration! Bidirectional Mode is not activate on {} end.".format("client"ifself.__bi_modeelse"server"))else:# otherwise just send confirmation message to serverawaitself.__msg_socket.send(bytes("Data received on client: {} !".format(self.__id),"utf-8"))# yield received tuple(data-frame) if bidirectional mode or else just frameifself.__bi_mode:yield(data["data"],frame)ifdata["data"]else(None,frame)else:yieldframe# sleep for sometimeawaitasyncio.sleep(0)asyncdef__frame_generator(self):""" Returns a default frame-generator for NetGear_Async's Server Handler. """# start streamself.__stream.start()# loop over stream until its terminatedwhilenotself.__terminate:# read framesframe=self.__stream.read()# break if NoneTypeifframeisNone:break# yield frameyieldframe# sleep for sometimeawaitasyncio.sleep(0)asyncdeftransceive_data(self,data:Any=None)->Any:""" Bidirectional Mode exclusive method to Transmit data _(in Receive mode)_ and Receive data _(in Send mode)_. Parameters: data (any): inputs data _(of any datatype)_ for sending back to Server. """recvd_data=Noneifnotself.__terminate:ifself.__bi_mode:ifself.__receive_mode:awaitself.__queue.put(data)else:ifnotself.__queue.empty():recvd_data=awaitself.__queue.get()self.__queue.task_done()else:logger.error("`transceive_data()` function cannot be used when Bidirectional Mode is disabled.")returnrecvd_dataasyncdef__terminate_connection(self,disable_confirmation=False):""" Internal asyncio method to safely terminate ZMQ connection and queues Parameters: disable_confirmation (boolean): Force disable termination confirmation from client in bidirectional patterns. """# log terminationself.__loggingandlogger.debug("Terminating various {} Processes. Please wait.".format("Receive Mode"ifself.__receive_modeelse"Send Mode"))# check whether `receive_mode` is enabled or notifself.__receive_mode:# indicate that process should be terminatedself.__terminate=Trueelse:# indicate that process should be terminatedself.__terminate=True# terminate streamifnot(self.__streamisNone):self.__stream.stop()# signal `exit` flag for termination!data_dict=dict(terminate=True)data_enc=msgpack.packb(data_dict)awaitself.__msg_socket.send(data_enc)# check if bidirectional patternsifself.__msg_pattern<2andnotdisable_confirmation:# then receive and log confirmationrecv_confirmation=awaitself.__msg_socket.recv()recvd_conf=msgpack.unpackb(recv_confirmation,use_list=False)self.__loggingand"terminated"inrecvd_confandlogger.debug(recvd_conf["terminated"])# close socketself.__msg_socket.setsockopt(zmq.LINGER,0)self.__msg_socket.close()# handle asyncio queues in bidirectional modeifself.__bi_mode:# empty queue if notwhilenotself.__queue.empty():try:self.__queue.get_nowait()exceptasyncio.QueueEmpty:continueself.__queue.task_done()# join queuesawaitself.__queue.join()logger.critical("{} successfully terminated!".format("Receive Mode"ifself.__receive_modeelse"Send Mode"))defclose(self,skip_loop:bool=False)->None:""" Terminates all NetGear_Async Asynchronous processes gracefully. Parameters: skip_loop (Boolean): (optional)used only if don't want to close eventloop(required in pytest). """# close event loop if specifiedifnot(skip_loop):# close connection gracefullyself.loop.run_until_complete(self.__terminate_connection())self.loop.close()else:# otherwise create a taskasyncio.ensure_future(self.__terminate_connection(disable_confirmation=True))
def__init__(self,# NetGear_Async parametersaddress:str=None,port:str=None,protocol:str="tcp",pattern:int=0,receive_mode:bool=False,timeout:Union[int,float]=0.0,# Videogear parametersenablePiCamera:bool=False,stabilize:bool=False,source:Any=None,camera_num:int=0,stream_mode:bool=False,backend:int=0,colorspace:str=None,resolution:Tuple[int,int]=(640,480),framerate:Union[int,float]=25,time_delay:int=0,# common parameterslogging:bool=False,**options:dict):""" This constructor method initializes the object state and attributes of the NetGear_Async class. Parameters: address (str): sets the valid network address of the Server/Client. port (str): sets the valid Network Port of the Server/Client. protocol (str): sets the valid messaging protocol between Server/Client. pattern (int): sets the supported messaging pattern(flow of communication) between Server/Client receive_mode (bool): select the NetGear_Async's Mode of operation. timeout (int/float): controls the maximum waiting time(in sec) after which Client throws `TimeoutError`. enablePiCamera (bool): provide access to PiGear(if True) or CamGear(if False) APIs respectively. stabilize (bool): enable access to Stabilizer Class for stabilizing frames. camera_num (int): selects the camera module index which will be used as Rpi source. resolution (tuple): sets the resolution (i.e. `(width,height)`) of the Rpi source. framerate (int/float): sets the framerate of the Rpi source. source (based on input): defines the source for the input stream. stream_mode (bool): controls the exclusive YouTube Mode. backend (int): selects the backend for OpenCV's VideoCapture class. colorspace (str): selects the colorspace of the input stream. logging (bool): enables/disables logging. time_delay (int): time delay (in sec) before start reading the frames. options (dict): provides ability to alter Tweak Parameters of NetGear_Async, CamGear, PiGear & Stabilizer. """# enable logging if specifiedself.__logging=loggingifisinstance(logging,bool)elseFalse# print current versionlogcurr_vidgear_ver(logging=self.__logging)# raise error(s) for critical Class importsimport_dependency_safe("zmq"ifzmqisNoneelse"",min_version="4.0",pkg_name="pyzmq")import_dependency_safe("msgpack"ifmsgpackisNoneelse"")import_dependency_safe("msgpack_numpy"ifmisNoneelse"")# define valid messaging patterns => `0`: PAIR, `1`:(REQ, REP), `2`:(SUB, PUB), `3`:(PUSH, PULL)valid_messaging_patterns={0:(zmq.PAIR,zmq.PAIR),1:(zmq.REQ,zmq.REP),2:(zmq.PUB,zmq.SUB),3:(zmq.PUSH,zmq.PULL),}# check whether user-defined messaging pattern is validifisinstance(pattern,int)andpatterninvalid_messaging_patterns:# assign valueself.__msg_pattern=patternself.__pattern=valid_messaging_patterns[pattern]else:# otherwise default to 0:`zmq.PAIR`self.__msg_pattern=0self.__pattern=valid_messaging_patterns[self.__msg_pattern]self.__loggingandlogger.warning("Invalid pattern {pattern}. Defaulting to `zmq.PAIR`!".format(pattern=pattern))# check whether user-defined messaging protocol is validifisinstance(protocol,str)andprotocolin["tcp","ipc"]:# assign valueself.__protocol=protocolelse:# else default to `tcp` protocolself.__protocol="tcp"self.__loggingandlogger.warning("Invalid protocol. Defaulting to `tcp`!")# initialize Termination flagself.__terminate=False# initialize and assign `Receive Mode`self.__receive_mode=receive_mode# initialize stream handlerself.__stream=None# initialize Messaging Socketself.__msg_socket=None# initialize NetGear_Async's configuration dictionaryself.config={}# asyncio queue handlerself.__queue=None# define Bidirectional modeself.__bi_mode=False# handles Bidirectional mode state# assign timeout for Receiver endiftimeoutandisinstance(timeout,(int,float)):self.__timeout=float(timeout)else:self.__timeout=15.0# generate 8-digit random system idself.__id="".join(secrets.choice(string.ascii_uppercase+string.digits)foriinrange(8))# Handle user-defined options dictionary values# reformat dictionaryoptions={str(k).strip():vfork,vinoptions.items()}# handle bidirectional modeif"bidirectional_mode"inoptions:value=options["bidirectional_mode"]# also check if pattern and source is validifisinstance(value,bool)andpattern<2andsourceisNone:# activate Bidirectional mode if specifiedself.__bi_mode=valueelse:# otherwise disable itself.__bi_mode=Falselogger.warning("Bidirectional data transmission is disabled!")# handle errors and loggingifpattern>=2:# raise errorraiseValueError("[NetGear_Async:ERROR] :: `{}` pattern is not valid when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(pattern))elifnot(sourceisNone):raiseValueError("[NetGear_Async:ERROR] :: Custom source must be used when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(pattern))elifisinstance(value,bool)andself.__logging:# log Bidirectional mode activationlogger.debug("Bidirectional Data Transmission is {} for this connection!".format("enabled"ifvalueelse"disabled"))else:logger.error("`bidirectional_mode` value is invalid!")# cleandeloptions["bidirectional_mode"]# Setup and assign event loop policyifplatform.system()=="Windows":# On Windows, VidGear requires the ``WindowsSelectorEventLoop``, but Python 3.8 and above,# defaults to an ``ProactorEventLoop`` loop that is not compatible with it. Thereby,# we had to set it manually.asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())else:ifnot(uvloopisNone):# Latest uvloop eventloop is only available for UNIX machines.asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())else:# log if not presentimport_dependency_safe("uvloop",error="log")# Retrieve event loop and assign ittry:self.loop=asyncio.get_running_loop()exceptRuntimeError:# otherwise create onelogger.critical("No running event loop found. Creating a new one.")self.loop=asyncio.new_event_loop()# log eventloop for debuggingself.__loggingandlogger.info("Using ``{}`` event loop for this process.".format(self.loop.__class__.__name__))# define messaging asynchronous Contextself.__msg_context=zmq.asyncio.Context()# check whether `Receive Mode` is enabledifreceive_mode:# assign local IP address if NoneifaddressisNone:self.__address="*"# define addresselse:self.__address=address# assign default port address if NoneifportisNone:self.__port="5555"else:self.__port=portelse:# Handle video sourceifsourceisNone:self.config={"generator":None}self.__loggingandlogger.warning("Given source is of NoneType!")else:# define stream with necessary paramsself.__stream=VideoGear(enablePiCamera=enablePiCamera,stabilize=stabilize,source=source,camera_num=camera_num,stream_mode=stream_mode,backend=backend,colorspace=colorspace,resolution=resolution,framerate=framerate,logging=logging,time_delay=time_delay,**options)# define default frame generator in configurationself.config={"generator":self.__frame_generator()}# assign local ip address if NoneifaddressisNone:self.__address="localhost"else:self.__address=address# assign default port address if NoneifportisNone:self.__port="5555"else:self.__port=port# add server task handlerself.task=None# create asyncio queue if bidirectional mode activatedself.__queue=asyncio.Queue()ifself.__bi_modeelseNone
defclose(self,skip_loop:bool=False)->None:""" Terminates all NetGear_Async Asynchronous processes gracefully. Parameters: skip_loop (Boolean): (optional)used only if don't want to close eventloop(required in pytest). """# close event loop if specifiedifnot(skip_loop):# close connection gracefullyself.loop.run_until_complete(self.__terminate_connection())self.loop.close()else:# otherwise create a taskasyncio.ensure_future(self.__terminate_connection(disable_confirmation=True))
deflaunch(self)->T:""" Launches an asynchronous generators and loop executors for respective task. """# check if receive mode enabledifself.__receive_mode:self.__loggingandlogger.debug("Launching NetGear_Async asynchronous generator!")# run loop executor for Receiver asynchronous generatorself.loop.run_in_executor(None,self.recv_generator)else:# Otherwise launch Server handlerself.__loggingandlogger.debug("Creating NetGear_Async asynchronous server handler!")# create task for Server Handlerself.task=self.loop.create_task(self.__server_handler())# return instancereturnself
asyncdefrecv_generator(self)->AsyncGenerator[Tuple[Any,NDArray],NDArray]:""" A default Asynchronous Frame Generator for NetGear_Async's Receiver-end. """# check whether `receive mode` is activatedifnot(self.__receive_mode):# raise Value error and exitself.__terminate=TrueraiseValueError("[NetGear_Async:ERROR] :: `recv_generator()` function cannot be accessed while `receive_mode` is disabled. Kindly refer vidgear docs!")# initialize and define messaging socketself.__msg_socket=self.__msg_context.socket(self.__pattern[1])# define exclusive socket options for patternsifself.__msg_pattern==2:self.__msg_socket.set_hwm(1)self.__msg_socket.setsockopt(zmq.SUBSCRIBE,b"")try:# bind socket to the assigned protocol, address and portself.__msg_socket.bind(self.__protocol+"://"+str(self.__address)+":"+str(self.__port))# finally log progressself.__loggingandlogger.debug("Successfully binded to address: {} with pattern: {}.".format((self.__protocol+"://"+str(self.__address)+":"+str(self.__port)),self.__msg_pattern,))logger.critical("Receive Mode is activated successfully!")exceptExceptionase:logger.exception(str(e))raiseRuntimeError("[NetGear_Async:ERROR] :: Failed to bind address: {} and pattern: {}{}!".format((self.__protocol+"://"+str(self.__address)+":"+str(self.__port)),self.__msg_pattern," and Bidirectional Mode enabled"ifself.__bi_modeelse"",))# loop until terminatedwhilenotself.__terminate:# get encoded data message from server withing timeout limitdatamsg_encoded=awaitasyncio.wait_for(self.__msg_socket.recv(),timeout=self.__timeout)# retrieve data from messagedata=msgpack.unpackb(datamsg_encoded,use_list=False)# terminate if exit` flag received from serverifdata["terminate"]:# send confirmation message to server if bidirectional patternsifself.__msg_pattern<2:# create termination confirmation messagereturn_dict=dict(terminated="Client-`{}` successfully terminated!".format(self.__id),)# encode messageretdata_enc=msgpack.packb(return_dict)# send message back to serverawaitself.__msg_socket.send(retdata_enc)self.__loggingandlogger.info("Termination signal received from server!")# break loop and terminateself.__terminate=Truebreak# get encoded frame message from server withing timeout limitframemsg_encoded=awaitasyncio.wait_for(self.__msg_socket.recv_multipart(),timeout=self.__timeout)# retrieve frame from messageframe=msgpack.unpackb(framemsg_encoded[0],use_list=False,object_hook=m.decode)# check if bidirectional patternsifself.__msg_pattern<2:# handle bidirectional data transfer if enabledifself.__bi_modeanddata["bi_mode"]:# handle empty queueifnotself.__queue.empty():return_data=awaitself.__queue.get()self.__queue.task_done()else:return_data=None# check if we are returning `ndarray` framesifnot(return_dataisNone)andisinstance(return_data,np.ndarray):# check whether the incoming frame is contiguousifnot(return_data.flags["C_CONTIGUOUS"]):return_data=np.ascontiguousarray(return_data,dtype=return_data.dtype)# create return type dict without datarettype_dict=dict(return_type=(type(return_data).__name__),return_data=None,)# encode itrettype_enc=msgpack.packb(rettype_dict)# send it to server with correct flagsawaitself.__msg_socket.send(rettype_enc,flags=zmq.SNDMORE)# encode return ndarray dataretframe_enc=msgpack.packb(return_data,default=m.encode)# send it over network to serverawaitself.__msg_socket.send_multipart([retframe_enc])else:# otherwise create type and data dictreturn_dict=dict(return_type=(type(return_data).__name__),return_data=(return_dataifnot(return_dataisNone)else""),)# encode itretdata_enc=msgpack.packb(return_dict)# send it over network to serverawaitself.__msg_socket.send(retdata_enc)elifself.__bi_modeordata["bi_mode"]:# raise error if bidirectional mode is disabled at server or client but not bothraiseRuntimeError("[NetGear_Async:ERROR] :: Invalid configuration! Bidirectional Mode is not activate on {} end.".format("client"ifself.__bi_modeelse"server"))else:# otherwise just send confirmation message to serverawaitself.__msg_socket.send(bytes("Data received on client: {} !".format(self.__id),"utf-8"))# yield received tuple(data-frame) if bidirectional mode or else just frameifself.__bi_mode:yield(data["data"],frame)ifdata["data"]else(None,frame)else:yieldframe# sleep for sometimeawaitasyncio.sleep(0)
asyncdeftransceive_data(self,data:Any=None)->Any:""" Bidirectional Mode exclusive method to Transmit data _(in Receive mode)_ and Receive data _(in Send mode)_. Parameters: data (any): inputs data _(of any datatype)_ for sending back to Server. """recvd_data=Noneifnotself.__terminate:ifself.__bi_mode:ifself.__receive_mode:awaitself.__queue.put(data)else:ifnotself.__queue.empty():recvd_data=awaitself.__queue.get()self.__queue.task_done()else:logger.error("`transceive_data()` function cannot be used when Bidirectional Mode is disabled.")returnrecvd_data