NetGear is exclusively designed to transfer video frames synchronously and asynchronously between interconnecting systems over the network in real-time.
NetGear implements a high-level wrapper around PyZmQ python library that contains python bindings for ZeroMQ - a high-performance asynchronous distributed messaging library that provides a message queue, but unlike message-oriented middleware, its system can run without a dedicated message broker.
NetGear also supports real-time Frame Compression capabilities for optimizing performance while sending the frames directly over the network, by encoding the frame before sending it and decoding it on the client's end automatically in real-time.
Info
NetGear API now internally implements robust Lazy Pirate pattern (auto-reconnection) for its synchronous messaging patterns (i.e. zmq.PAIR & zmq.REQ/zmq.REP) at both Server and Client ends, where its API instead of doing a blocking receive, will:
Poll the socket and receive from it only when it's sure a reply has arrived.
Attempt to reconnect, if no reply has arrived within a timeout period.
Abandon the connection if there is still no reply after several requests.
NetGear as of now seamlessly supports three ZeroMQ messaging patterns:
zmq.PAIR(ZMQ Pair Pattern)
zmq.REQ/zmq.REP(ZMQ Request/Reply Pattern)
zmq.PUB/zmq.SUB(ZMQ Publish/Subscribe Pattern)
whereas the supported protocol are: tcp and ipc.
Modes of Operation
Primary Modes
NetGear API primarily has two modes of operations:
Send Mode:which employs send() function to send video frames over the network in real-time.
Receive Mode:which employs recv() function to receive frames, sent over the network with Send Mode in real-time. The mode sends back confirmation when the frame is received successfully in few patterns.
Exclusive Modes
In addition to these primary modes, NetGear API offers applications-specific Exclusive Modes:
Multi-Servers Mode:In this exclusive mode, NetGear API robustly handles multiple servers at once, thereby providing seamless access to frames and unidirectional data transfer from multiple Servers/Publishers across the network in real-time.
Multi-Clients Mode:In this exclusive mode, NetGear API robustly handles multiple clients at once, thereby providing seamless access to frames and unidirectional data transfer to multiple Client/Consumers across the network in real-time.
Bidirectional Mode:This exclusive mode provides seamless support for bidirectional data transmission between between Server and Client along with video frames.
Secure Mode:In this exclusive mode, NetGear API provides easy access to powerful, smart & secure ZeroMQ's Security Layers that enables strong encryption on data, and unbreakable authentication between the Server and Client with the help of custom certificates/keys that brings cheap, standardized privacy and authentication for distributed systems over the network.
classNetGear:""" NetGear is exclusively designed to transfer video frames synchronously and asynchronously between interconnecting systems over the network in real-time. NetGear implements a high-level wrapper around PyZmQ python library that contains python bindings for ZeroMQ - a high-performance asynchronous distributed messaging library that provides a message queue, but unlike message-oriented middleware, its system can run without a dedicated message broker. NetGear also supports real-time Frame Compression capabilities for optimizing performance while sending the frames directly over the network, by encoding the frame before sending it and decoding it on the client's end automatically in real-time. !!! info NetGear API now internally implements robust *Lazy Pirate pattern* (auto-reconnection) for its synchronous messaging patterns _(i.e. `zmq.PAIR` & `zmq.REQ/zmq.REP`)_ at both Server and Client ends, where its API instead of doing a blocking receive, will: * Poll the socket and receive from it only when it's sure a reply has arrived. * Attempt to reconnect, if no reply has arrived within a timeout period. * Abandon the connection if there is still no reply after several requests. NetGear as of now seamlessly supports three ZeroMQ messaging patterns: - `zmq.PAIR` _(ZMQ Pair Pattern)_ - `zmq.REQ/zmq.REP` _(ZMQ Request/Reply Pattern)_ - `zmq.PUB/zmq.SUB` _(ZMQ Publish/Subscribe Pattern)_ _whereas the supported protocol are: `tcp` and `ipc`_. ??? tip "Modes of Operation" * **Primary Modes** NetGear API primarily has two modes of operations: * **Send Mode:** _which employs `send()` function to send video frames over the network in real-time._ * **Receive Mode:** _which employs `recv()` function to receive frames, sent over the network with *Send Mode* in real-time. The mode sends back confirmation when the frame is received successfully in few patterns._ * **Exclusive Modes** In addition to these primary modes, NetGear API offers applications-specific Exclusive Modes: * **Multi-Servers Mode:** _In this exclusive mode, NetGear API robustly **handles multiple servers at once**, thereby providing seamless access to frames and unidirectional data transfer from multiple Servers/Publishers across the network in real-time._ * **Multi-Clients Mode:** _In this exclusive mode, NetGear API robustly **handles multiple clients at once**, thereby providing seamless access to frames and unidirectional data transfer to multiple Client/Consumers across the network in real-time._ * **Bidirectional Mode:** _This exclusive mode **provides seamless support for bidirectional data transmission between between Server and Client along with video frames**._ * **Secure Mode:** _In this exclusive mode, NetGear API **provides easy access to powerful, smart & secure ZeroMQ's Security Layers** that enables strong encryption on data, and unbreakable authentication between the Server and Client with the help of custom certificates/keys that brings cheap, standardized privacy and authentication for distributed systems over the network._ """def__init__(self,address:str=None,port:str=None,protocol:str=None,pattern:int=0,receive_mode:bool=False,logging:bool=False,**options:dict):""" This constructor method initializes the object state and attributes of the NetGear class. Parameters: address (str): sets the valid network address of the Server/Client. port (str): sets the valid Network Port of the Server/Client. protocol (str): sets the valid messaging protocol between Server/Client. pattern (int): sets the supported messaging pattern(flow of communication) between Server/Client receive_mode (bool): select the Netgear's Mode of operation. logging (bool): enables/disables logging. options (dict): provides the flexibility to alter various NetGear internal properties. """# enable logging if specifiedself.__logging=loggingifisinstance(logging,bool)elseFalse# print current versionlogcurr_vidgear_ver(logging=self.__logging)# raise error(s) for critical Class importsimport_dependency_safe("zmq"ifzmqisNoneelse"",min_version="4.0",pkg_name="pyzmq")import_dependency_safe("simplejpeg"ifsimplejpegisNoneelse"",error="log",min_version="1.6.1")# define valid messaging patterns => `0`: zmq.PAIR, `1`:(zmq.REQ,zmq.REP), and `1`:(zmq.SUB,zmq.PUB)valid_messaging_patterns={0:(zmq.PAIR,zmq.PAIR),1:(zmq.REQ,zmq.REP),2:(zmq.PUB,zmq.SUB),}# Handle messaging patternmsg_pattern=None# check whether user-defined messaging pattern is validifisinstance(pattern,int)andpatterninvalid_messaging_patterns.keys():# assign valuemsg_pattern=valid_messaging_patterns[pattern]else:# otherwise default to 0:`zmq.PAIR`pattern=0msg_pattern=valid_messaging_patterns[pattern]self.__loggingandlogger.warning("Wrong pattern value, Defaulting to `zmq.PAIR`! Kindly refer Docs for more Information.")# assign pattern to global parameter for further useself.__pattern=pattern# Handle messaging protocolifprotocolisNoneornot(protocolin["tcp","ipc"]):# else default to `tcp` protocolprotocol="tcp"# log itself.__loggingandlogger.warning("Protocol is not supported or not provided. Defaulting to `tcp` protocol!")# Handle connection paramsself.__msg_flag=0# handles connection flagsself.__msg_copy=False# handles whether to copy dataself.__msg_track=False# handles whether to track packets# Handle NetGear's internal exclusive modes and params# define Secure Modeself.__z_auth=None# define SSH Tunneling Modeself.__ssh_tunnel_mode=None# handles ssh_tunneling mode stateself.__ssh_tunnel_pwd=Noneself.__ssh_tunnel_keyfile=Noneself.__paramiko_present=FalseifparamikoisNoneelseTrue# define Multi-Server modeself.__multiserver_mode=False# handles multi-server mode state# define Multi-Client modeself.__multiclient_mode=False# handles multi-client mode state# define Bidirectional modeself.__bi_mode=False# handles Bidirectional mode state# define Secure modevalid_security_mech={0:"Grasslands",1:"StoneHouse",2:"IronHouse"}self.__secure_mode=0# handles ZMQ security layer statusauth_cert_dir=""# handles valid ZMQ certificates dirself.__auth_publickeys_dir=""# handles valid ZMQ public certificates dirself.__auth_secretkeys_dir=""# handles valid ZMQ private certificates diroverwrite_cert=False# checks if certificates overwriting allowedcustom_cert_location=""# handles custom ZMQ certificates path# define frame-compression handlerself.__jpeg_compression=(Trueifnot(simplejpegisNone)elseFalse)# enabled by default for all connections if simplejpeg is installedself.__jpeg_compression_quality=90# 90% qualityself.__jpeg_compression_fastdct=True# fastest DCT on by defaultself.__jpeg_compression_fastupsample=False# fastupsample off by defaultself.__jpeg_compression_colorspace="BGR"# use BGR colorspace by default# defines frame compression on return dataself.__ex_compression_params=None# define receiver return data handlerself.__return_data=None# generate 8-digit random system idself.__id="".join(secrets.choice(string.ascii_uppercase+string.digits)foriinrange(8))# define termination flagself.__terminate=False# additional settings for reliabilityifpattern<2:# define zmq poller for reliable transmissionself.__poll=zmq.Poller()# define max retriesself.__max_retries=3# request timeoutself.__request_timeout=4000# 4 secselse:# subscriber timeoutself.__subscriber_timeout=None# Handle user-defined options dictionary values# reformat dictionaryoptions={str(k).strip():vfork,vinoptions.items()}# loop over dictionary key & values and assign to global variables if validforkey,valueinoptions.items():# handle multi-server modeifkey=="multiserver_mode"andisinstance(value,bool):# check if valid pattern assignedifpattern>0:# activate Multi-server modeself.__multiserver_mode=valueelse:# otherwise disable it and raise errorself.__multiserver_mode=Falselogger.critical("Multi-Server Mode is disabled!")raiseValueError("[NetGear:ERROR] :: `{}` pattern is not valid when Multi-Server Mode is enabled. Kindly refer Docs for more Information.".format(pattern))# handle multi-client modeelifkey=="multiclient_mode"andisinstance(value,bool):# check if valid pattern assignedifpattern>0:# activate Multi-client modeself.__multiclient_mode=valueelse:# otherwise disable it and raise errorself.__multiclient_mode=Falselogger.critical("Multi-Client Mode is disabled!")raiseValueError("[NetGear:ERROR] :: `{}` pattern is not valid when Multi-Client Mode is enabled. Kindly refer Docs for more Information.".format(pattern))# handle bidirectional modeelifkey=="bidirectional_mode"andisinstance(value,bool):# check if pattern is validifpattern<2:# activate Bidirectional mode if specifiedself.__bi_mode=valueelse:# otherwise disable it and raise errorself.__bi_mode=Falselogger.warning("Bidirectional data transmission is disabled!")raiseValueError("[NetGear:ERROR] :: `{}` pattern is not valid when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(pattern))# handle secure modeelif(key=="secure_mode"andisinstance(value,int)and(valueinvalid_security_mech)):self.__secure_mode=valueelifkey=="custom_cert_location"andisinstance(value,str):# verify custom auth certificates path for secure modecustom_cert_location=os.path.abspath(value)assertos.path.isdir(custom_cert_location),"[NetGear:ERROR] :: `custom_cert_location` value must be the path to a valid directory!"assertcheck_WriteAccess(custom_cert_location,is_windows=Trueifos.name=="nt"elseFalse,logging=self.__logging,),"[NetGear:ERROR] :: Permission Denied!, cannot write ZMQ authentication certificates to '{}' directory!".format(value)elifkey=="overwrite_cert"andisinstance(value,bool):# enable/disable auth certificate overwriting in secure modeoverwrite_cert=value# handle ssh-tunneling modeelifkey=="ssh_tunnel_mode"andisinstance(value,str):# enable SSH Tunneling Modeself.__ssh_tunnel_mode=value.strip()elifkey=="ssh_tunnel_pwd"andisinstance(value,str):# add valid SSH Tunneling passwordself.__ssh_tunnel_pwd=valueelifkey=="ssh_tunnel_keyfile"andisinstance(value,str):# add valid SSH Tunneling key-fileself.__ssh_tunnel_keyfile=valueifos.path.isfile(value)elseNoneifself.__ssh_tunnel_keyfileisNone:logger.warning("Discarded invalid or non-existential SSH Tunnel Key-file at {}!".format(value))# handle jpeg compressionelif(key=="jpeg_compression"andnot(simplejpegisNone)andisinstance(value,(bool,str))):ifisinstance(value,str)andvalue.strip().upper()in["RGB","BGR","RGBX","BGRX","XBGR","XRGB","GRAY","RGBA","BGRA","ABGR","ARGB","CMYK",]:# set encoding colorspaceself.__jpeg_compression_colorspace=value.strip().upper()# enable frame-compression encoding valueself.__jpeg_compression=Trueelse:# enable frame-compression encoding valueself.__jpeg_compression=valueelifkey=="jpeg_compression_quality"andisinstance(value,(int,float)):# set valid jpeg qualityifvalue>=10andvalue<=100:self.__jpeg_compression_quality=int(value)else:logger.warning("Skipped invalid `jpeg_compression_quality` value!")elifkey=="jpeg_compression_fastdct"andisinstance(value,bool):# enable jpeg fastdctself.__jpeg_compression_fastdct=valueelifkey=="jpeg_compression_fastupsample"andisinstance(value,bool):# enable jpeg fastupsampleself.__jpeg_compression_fastupsample=value# assign maximum retries in synchronous patternselifkey=="max_retries"andisinstance(value,int)andpattern<2:ifvalue>=0:self.__max_retries=valueelse:logger.warning("Invalid `max_retries` value skipped!")# assign request timeout in synchronous patternselifkey=="request_timeout"andisinstance(value,int)andpattern<2:ifvalue>=4:self.__request_timeout=value*1000# covert to millisecondselse:logger.warning("Invalid `request_timeout` value skipped!")# assign subscriber timeoutelif(key=="subscriber_timeout"andisinstance(value,int)andpattern==2):ifvalue>0:self.__subscriber_timeout=value*1000# covert to millisecondselse:logger.warning("Invalid `request_timeout` value skipped!")# handle ZMQ flagselifkey=="flag"andisinstance(value,int):self.__msg_flag=valueself.__msg_flagandlogger.warning("The flag optional value is set to `1` (NOBLOCK) for this run. This might cause NetGear to not terminate gracefully.")elifkey=="copy"andisinstance(value,bool):self.__msg_copy=valueelifkey=="track"andisinstance(value,bool):self.__msg_track=valueself.__msg_copyandself.__msg_trackandlogger.info("The `track` optional value will be ignored for this run because `copy=True` is also defined.")else:pass# Handle ssh tunneling if enabledifnot(self.__ssh_tunnel_modeisNone):# SSH Tunnel Mode only available for server modeifreceive_mode:logger.error("SSH Tunneling cannot be enabled for Client-end!")else:# check if SSH tunneling possiblessh_address=self.__ssh_tunnel_modessh_address,ssh_port=(ssh_address.split(":")if":"inssh_addresselse[ssh_address,"22"])# default to port 22if"47"inssh_port:self.__ssh_tunnel_mode=self.__ssh_tunnel_mode.replace(":47","")# port-47 is reserved for testingelse:# extract ip for validationssh_user,ssh_ip=(ssh_address.split("@")if"@"inssh_addresselse["",ssh_address])# validate ip specified portassertcheck_open_port(ssh_ip,port=int(ssh_port)),"[NetGear:ERROR] :: Host `{}` is not available for SSH Tunneling at port-{}!".format(ssh_address,ssh_port)# Handle multiple exclusive modes if enabledifself.__multiclient_modeandself.__multiserver_mode:raiseValueError("[NetGear:ERROR] :: Multi-Client and Multi-Server Mode cannot be enabled simultaneously!")elifself.__multiserver_modeorself.__multiclient_mode:# check if Bidirectional Mode also enabledifself.__bi_mode:# log itself.__loggingandlogger.debug("Bidirectional Data Transmission is also enabled for this connection!")# check if SSH Tunneling Mode also enabledifself.__ssh_tunnel_mode:# raise errorraiseValueError("[NetGear:ERROR] :: SSH Tunneling and {} Mode cannot be enabled simultaneously. Kindly refer docs!".format("Multi-Server"ifself.__multiserver_modeelse"Multi-Client"))elifself.__bi_mode:# log Bidirectional mode activationself.__loggingandlogger.debug("Bidirectional Data Transmission is enabled for this connection!")elifself.__ssh_tunnel_mode:# log Bidirectional mode activationself.__loggingandlogger.debug("SSH Tunneling is enabled for host:`{}` with `{}` back-end.".format(self.__ssh_tunnel_mode,"paramiko"ifself.__paramiko_presentelse"pexpect",))# On Windows, NetGear requires the ``WindowsSelectorEventLoop`` but Python 3.8 and above,# defaults to an ``ProactorEventLoop`` loop that is not compatible with it. Thereby,# we had to set it manually.platform.system()=="Windows"andasyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())# define ZMQ messaging context instanceself.__msg_context=zmq.Context.instance()# initialize and assign receive mode to global variableself.__receive_mode=receive_mode# Handle Secure modeifself.__secure_mode>0:# activate and log if overwriting is enabledifreceive_mode:overwrite_cert=Falseoverwrite_certandlogger.warning("Overwriting ZMQ Authentication certificates is disabled for Client's end!")else:overwrite_certandself.__loggingandlogger.info("Overwriting ZMQ Authentication certificates over previous ones!")# Validate certificate generation paths# Start threaded authenticator for this contexttry:# check if custom certificates path is specifiedifcustom_cert_location:(auth_cert_dir,self.__auth_secretkeys_dir,self.__auth_publickeys_dir,)=generate_auth_certificates(custom_cert_location,overwrite=overwrite_cert,logging=logging)else:# otherwise auto-generate suitable path(auth_cert_dir,self.__auth_secretkeys_dir,self.__auth_publickeys_dir,)=generate_auth_certificates(os.path.join(expanduser("~"),".vidgear"),overwrite=overwrite_cert,logging=logging,)# log itself.__loggingandlogger.debug("`{}` is the default location for storing ZMQ authentication certificates/keys.".format(auth_cert_dir))# start an authenticator for this contextself.__z_auth=ThreadAuthenticator(self.__msg_context)self.__z_auth.start()self.__z_auth.allow(str(address))# allow current address# check if `IronHouse` is activatedifself.__secure_mode==2:# tell authenticator to use the certificate from given valid dirself.__z_auth.configure_curve(domain="*",location=self.__auth_publickeys_dir)else:# otherwise tell the authenticator how to handle the CURVE requests, if `StoneHouse` is activatedself.__z_auth.configure_curve(domain="*",location=auth.CURVE_ALLOW_ANY)exceptzmq.ZMQErrorase:if"Address in use"instr(e):logger.info("ZMQ Authenticator already running.")else:# catch if any error occurred and disable Secure modelogger.exception(str(e))self.__secure_mode=0logger.error("ZMQ Security Mechanism is disabled for this connection due to errors!")# check whether `receive_mode` is enabledifself.__receive_mode:# define connection addressaddress="*"ifaddressisNoneelseaddress# check if multiserver_mode is enabledifself.__multiserver_mode:# check if unique server port address list/tuple is assigned or not in multiserver_modeifportisNoneornotisinstance(port,(tuple,list)):# raise error if notraiseValueError("[NetGear:ERROR] :: Incorrect port value! Kindly provide a list/tuple of Server ports while Multi-Server mode is enabled. For more information refer VidGear docs.")else:# otherwise log itlogger.debug("Enabling Multi-Server Mode at PORTS: {}!".format(port))# create port address buffer for keeping track of connected client's port(s)self.__port_buffer=[]# check if multiclient_mode is enabledelifself.__multiclient_mode:# check if unique server port address is assigned or not in multiclient_modeifportisNone:# raise error if notraiseValueError("[NetGear:ERROR] :: Kindly provide a unique & valid port value at Client-end. For more information refer VidGear docs.")else:# otherwise log itlogger.debug("Enabling Multi-Client Mode at PORT: {} on this device!".format(port))# assign value to global variableself.__port=portelse:# otherwise assign local port address if Noneport="5555"ifportisNoneelseporttry:# define thread-safe messaging socketself.__msg_socket=self.__msg_context.socket(msg_pattern[1])# define pub-sub flagself.__pattern==2andself.__msg_socket.set_hwm(1)# enable specified secure mode for the socketifself.__secure_mode>0:# load server keyserver_secret_file=os.path.join(self.__auth_secretkeys_dir,"server.key_secret")server_public,server_secret=auth.load_certificate(server_secret_file)# load all CURVE keysself.__msg_socket.curve_secretkey=server_secretself.__msg_socket.curve_publickey=server_public# enable CURVE connection for this socketself.__msg_socket.curve_server=True# define exclusive socket options for `patterns=2`ifself.__pattern==2:self.__msg_socket.setsockopt_string(zmq.SUBSCRIBE,"")self.__subscriber_timeoutandself.__msg_socket.setsockopt(zmq.RCVTIMEO,self.__subscriber_timeout)self.__subscriber_timeoutandself.__msg_socket.setsockopt(zmq.LINGER,0)# if multiserver_mode is enabled, then assign port addresses to zmq socketifself.__multiserver_mode:# bind socket to given server protocol, address and portsforptinport:self.__msg_socket.bind(protocol+"://"+str(address)+":"+str(pt))else:# bind socket to given protocol, address and port normallyself.__msg_socket.bind(protocol+"://"+str(address)+":"+str(port))# additional settingsifpattern<2:ifself.__multiserver_mode:self.__connection_address=[]forptinport:self.__connection_address.append(protocol+"://"+str(address)+":"+str(pt))else:self.__connection_address=(protocol+"://"+str(address)+":"+str(port))self.__msg_pattern=msg_pattern[1]self.__poll.register(self.__msg_socket,zmq.POLLIN)self.__loggingandlogger.debug("Reliable transmission is enabled for this pattern with max-retries: {} and timeout: {} secs.".format(self.__max_retries,self.__request_timeout/1000))else:self.__loggingandself.__subscriber_timeoutandlogger.debug("Timeout: {} secs is enabled for this system.".format(self.__subscriber_timeout/1000))exceptExceptionase:# otherwise log and raise errorlogger.exception(str(e))# Handle Secure Modeself.__secure_modeandlogger.critical("Failed to activate Secure Mode: `{}` for this connection!".format(valid_security_mech[self.__secure_mode]))# raise errors for exclusive modesifself.__multiserver_modeorself.__multiclient_mode:raiseRuntimeError("[NetGear:ERROR] :: Receive Mode failed to activate {} Mode at address: {} with pattern: {}! Kindly recheck all parameters.".format(("Multi-Server"ifself.__multiserver_modeelse"Multi-Client"),(protocol+"://"+str(address)+":"+str(port)),pattern,))else:self.__bi_modeandlogger.critical("Failed to activate Bidirectional Mode for this connection!")raiseRuntimeError("[NetGear:ERROR] :: Receive Mode failed to bind address: {} and pattern: {}! Kindly recheck all parameters.".format((protocol+"://"+str(address)+":"+str(port)),pattern))# Handle threaded queue modeself.__loggingandlogger.debug("Threaded Queue Mode is enabled by default for this connection.")# define deque and assign it to global varself.__queue=deque(maxlen=96)# max len 96 to check overflow# initialize and start threaded recv_handlerself.__thread=Thread(target=self.__recv_handler,name="NetGear",args=())self.__thread.daemon=Trueself.__thread.start()ifself.__logging:# finally log progresslogger.debug("Successfully Binded to address: {} with pattern: {}.".format((protocol+"://"+str(address)+":"+str(port)),pattern))self.__jpeg_compressionandlogger.debug("JPEG Frame-Compression is activated for this connection with Colorspace:`{}`, Quality:`{}`%, Fastdct:`{}`, and Fastupsample:`{}`.".format(self.__jpeg_compression_colorspace,self.__jpeg_compression_quality,("enabled"ifself.__jpeg_compression_fastdctelse"disabled"),("enabled"ifself.__jpeg_compression_fastupsampleelse"disabled"),))self.__secure_modeandlogger.debug("Successfully enabled ZMQ Security Mechanism: `{}` for this connection.".format(valid_security_mech[self.__secure_mode]))logger.debug("Multi-threaded Receive Mode is successfully enabled.")logger.debug("Unique System ID is {}.".format(self.__id))logger.debug("Receive Mode is now activated.")else:# otherwise default to `Send Mode`# define connection addressaddress="localhost"ifaddressisNoneelseaddress# check if multiserver_mode is enabledifself.__multiserver_mode:# check if unique server port address is assigned or not in multiserver_modeifportisNone:# raise error if notraiseValueError("[NetGear:ERROR] :: Kindly provide a unique & valid port value at Server-end. For more information refer VidGear docs.")else:# otherwise log itlogger.debug("Enabling Multi-Server Mode at PORT: {} on this device!".format(port))# assign value to global variableself.__port=port# check if multiclient_mode is enabledelifself.__multiclient_mode:# check if unique client port address list/tuple is assigned or not in multiclient_modeifportisNoneornotisinstance(port,(tuple,list)):# raise error if notraiseValueError("[NetGear:ERROR] :: Incorrect port value! Kindly provide a list/tuple of Client ports while Multi-Client mode is enabled. For more information refer VidGear docs.")else:# otherwise log itlogger.debug("Enabling Multi-Client Mode at PORTS: {}!".format(port))# create port address buffer for keeping track of connected client portsself.__port_buffer=[]else:# otherwise assign local port address if Noneport="5555"ifportisNoneelseporttry:# define thread-safe messaging socketself.__msg_socket=self.__msg_context.socket(msg_pattern[0])# if req/rep pattern, define additional flagsifself.__pattern==1:self.__msg_socket.REQ_RELAXED=Trueself.__msg_socket.REQ_CORRELATE=True# if pub/sub pattern, define additional optimizerifself.__pattern==2:self.__msg_socket.set_hwm(1)# enable specified secure mode for the socketifself.__secure_mode>0:# load client keyclient_secret_file=os.path.join(self.__auth_secretkeys_dir,"client.key_secret")client_public,client_secret=auth.load_certificate(client_secret_file)# load all CURVE keysself.__msg_socket.curve_secretkey=client_secretself.__msg_socket.curve_publickey=client_public# load server keyserver_public_file=os.path.join(self.__auth_publickeys_dir,"server.key")server_public,_=auth.load_certificate(server_public_file)# inject public key to make a CURVE connection.self.__msg_socket.curve_serverkey=server_public# check if multi-client_mode is enabledifself.__multiclient_mode:# bind socket to given server protocol, address and portsforptinport:self.__msg_socket.connect(protocol+"://"+str(address)+":"+str(pt))else:# handle SSH tunneling if enabledifself.__ssh_tunnel_mode:# establish tunnel connectionssh.tunnel_connection(self.__msg_socket,protocol+"://"+str(address)+":"+str(port),self.__ssh_tunnel_mode,keyfile=self.__ssh_tunnel_keyfile,password=self.__ssh_tunnel_pwd,paramiko=self.__paramiko_present,)else:# connect socket to given protocol, address and portself.__msg_socket.connect(protocol+"://"+str(address)+":"+str(port))# additional settingsifpattern<2:ifself.__multiclient_mode:self.__connection_address=[]forptinport:self.__connection_address.append(protocol+"://"+str(address)+":"+str(pt))else:self.__connection_address=(protocol+"://"+str(address)+":"+str(port))self.__msg_pattern=msg_pattern[0]self.__poll.register(self.__msg_socket,zmq.POLLIN)self.__loggingandlogger.debug("Reliable transmission is enabled for this pattern with max-retries: {} and timeout: {} secs.".format(self.__max_retries,self.__request_timeout/1000))exceptExceptionase:# otherwise log and raise errorlogger.exception(str(e))# Handle Secure Modeself.__secure_modeandlogger.critical("Failed to activate Secure Mode: `{}` for this connection!".format(valid_security_mech[self.__secure_mode]))# raise errors for exclusive modesifself.__multiserver_modeorself.__multiclient_mode:raiseRuntimeError("[NetGear:ERROR] :: Send Mode failed to activate {} Mode at address: {} with pattern: {}! Kindly recheck all parameters.".format(("Multi-Server"ifself.__multiserver_modeelse"Multi-Client"),(protocol+"://"+str(address)+":"+str(port)),pattern,))else:self.__bi_modeandlogger.critical("Failed to activate Bidirectional Mode for this connection!")self.__ssh_tunnel_modeandlogger.critical("Failed to initiate SSH Tunneling Mode for this server with `{}` back-end!".format("paramiko"ifself.__paramiko_presentelse"pexpect"))raiseRuntimeError("[NetGear:ERROR] :: Send Mode failed to connect address: {} and pattern: {}! Kindly recheck all parameters.".format((protocol+"://"+str(address)+":"+str(port)),pattern))ifself.__logging:# finally log progresslogger.debug("Successfully connected to address: {} with pattern: {}.".format((protocol+"://"+str(address)+":"+str(port)),pattern))self.__jpeg_compressionandlogger.debug("JPEG Frame-Compression is activated for this connection with Colorspace:`{}`, Quality:`{}`%, Fastdct:`{}`, and Fastupsample:`{}`.".format(self.__jpeg_compression_colorspace,self.__jpeg_compression_quality,("enabled"ifself.__jpeg_compression_fastdctelse"disabled"),("enabled"ifself.__jpeg_compression_fastupsampleelse"disabled"),))self.__secure_modeandlogger.debug("Enabled ZMQ Security Mechanism: `{}` for this connection.".format(valid_security_mech[self.__secure_mode]))logger.debug("Unique System ID is {}.".format(self.__id))logger.debug("Send Mode is successfully activated and ready to send data.")def__recv_handler(self):""" A threaded receiver handler, that keep iterating data from ZMQ socket to a internally monitored deque, until the thread is terminated, or socket disconnects. """# initialize variablesframe=Nonemsg_json=None# keep looping infinitely until the thread is terminatedwhilenotself.__terminate:# check queue buffer for overflowiflen(self.__queue)>=96:# stop iterating if overflowing occurstime.sleep(0.000001)continueifself.__pattern<2:socks=dict(self.__poll.poll(self.__request_timeout*3))ifsocks.get(self.__msg_socket)==zmq.POLLIN:msg_json=self.__msg_socket.recv_json(flags=self.__msg_flag|zmq.DONTWAIT)else:logger.critical("No response from Server(s), Reconnecting again...")self.__msg_socket.close(linger=0)self.__poll.unregister(self.__msg_socket)self.__max_retries-=1ifnot(self.__max_retries):ifself.__multiserver_mode:logger.error("All Servers seems to be offline, Abandoning!")else:logger.error("Server seems to be offline, Abandoning!")self.__terminate=Truecontinue# Create new connectiontry:self.__msg_socket=self.__msg_context.socket(self.__msg_pattern)ifisinstance(self.__connection_address,list):for_connectioninself.__connection_address:self.__msg_socket.bind(_connection)else:self.__msg_socket.bind(self.__connection_address)exceptExceptionase:logger.exception(str(e))self.__terminate=TrueraiseRuntimeError("API failed to restart the Client-end!")self.__poll.register(self.__msg_socket,zmq.POLLIN)continueelse:try:msg_json=self.__msg_socket.recv_json(flags=self.__msg_flag)exceptzmq.ZMQErrorase:ife.errno==zmq.EAGAIN:logger.critical("Connection Timeout. Exiting!")self.__terminate=Trueself.__queue.append(None)break# check if terminate_flag` receivedifmsg_jsonandmsg_json["terminate_flag"]:# if multiserver_mode is enabledifself.__multiserver_mode:# check and remove from which ports signal is receivedifmsg_json["port"]inself.__port_buffer:# if pattern is 1, then send back server the info about terminationifself.__pattern==1:self.__msg_socket.send_string("Termination signal successfully received at client!")self.__port_buffer.remove(msg_json["port"])self.__loggingandlogger.warning("Termination signal received from Server at port: {}!".format(msg_json["port"]))# if termination signal received from all servers then exit client.ifnotself.__port_buffer:logger.critical("Termination signal received from all Servers!!!")self.__terminate=True# terminationelse:# if pattern is 1, then send back server the info about terminationifself.__pattern==1:self.__msg_socket.send_string("Termination signal successfully received at Client's end!")# terminationself.__terminate=True# notify clientself.__loggingandlogger.critical("Termination signal received from server!")continuetry:msg_data=self.__msg_socket.recv(flags=self.__msg_flag|zmq.DONTWAIT,copy=self.__msg_copy,track=self.__msg_track,)exceptzmq.ZMQErrorase:logger.critical("Socket Session Expired. Exiting!")self.__terminate=Trueself.__queue.append(None)break# handle data transfer in synchronous modes.ifself.__pattern<2:ifself.__bi_modeorself.__multiclient_mode:# check if we are returning `ndarray` framesifnot(self.__return_dataisNone)andisinstance(self.__return_data,np.ndarray):# handle return data for compressionreturn_data=np.copy(self.__return_data)# check whether exit_flag is Falseifnot(return_data.flags["C_CONTIGUOUS"]):# check whether the incoming frame is contiguousreturn_data=np.ascontiguousarray(return_data,dtype=return_data.dtype)# handle jpeg-compression encodingifself.__jpeg_compression:ifself.__jpeg_compression_colorspace=="GRAY":ifreturn_data.ndim==2:# patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11return_data=return_data[:,:,np.newaxis]return_data=simplejpeg.encode_jpeg(return_data,quality=self.__jpeg_compression_quality,colorspace=self.__jpeg_compression_colorspace,fastdct=self.__jpeg_compression_fastdct,)else:return_data=simplejpeg.encode_jpeg(return_data,quality=self.__jpeg_compression_quality,colorspace=self.__jpeg_compression_colorspace,colorsubsampling="422",fastdct=self.__jpeg_compression_fastdct,)return_dict=(dict(port=self.__port)ifself.__multiclient_modeelsedict())return_dict.update(dict(return_type=(type(self.__return_data).__name__),compression=({"dct":self.__jpeg_compression_fastdct,"ups":self.__jpeg_compression_fastupsample,"colorspace":self.__jpeg_compression_colorspace,}ifself.__jpeg_compressionelseFalse),array_dtype=(str(self.__return_data.dtype)ifnot(self.__jpeg_compression)else""),array_shape=(self.__return_data.shapeifnot(self.__jpeg_compression)else""),data=None,))# send the json dictself.__msg_socket.send_json(return_dict,self.__msg_flag|zmq.SNDMORE)# send the array with correct flagsself.__msg_socket.send(return_data,flags=self.__msg_flag,copy=self.__msg_copy,track=self.__msg_track,)else:return_dict=(dict(port=self.__port)ifself.__multiclient_modeelsedict())return_dict.update(dict(return_type=(type(self.__return_data).__name__),data=self.__return_data,))self.__msg_socket.send_json(return_dict,self.__msg_flag)else:# send confirmation message to serverself.__msg_socket.send_string("Data received on device: {} !".format(self.__id))else:# else raise warningifself.__return_data:logger.warning("`return_data` is disabled for this pattern!")# check if encoding was enabledifmsg_json["compression"]:# decode JPEG frameframe=simplejpeg.decode_jpeg(msg_data,colorspace=msg_json["compression"]["colorspace"],fastdct=self.__jpeg_compression_fastdctormsg_json["compression"]["dct"],fastupsample=self.__jpeg_compression_fastupsampleormsg_json["compression"]["ups"],)# check if valid frame returnedifframeisNone:self.__terminate=True# otherwise raise error and exitraiseRuntimeError("[NetGear:ERROR] :: Received compressed JPEG frame decoding failed")ifmsg_json["compression"]["colorspace"]=="GRAY"andframe.ndim==3:# patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11frame=np.squeeze(frame,axis=2)else:# recover and reshape frame from bufferframe_buffer=np.frombuffer(msg_data,dtype=msg_json["dtype"])frame=frame_buffer.reshape(msg_json["shape"])# check if multiserver_modeifself.__multiserver_mode:# save the unique port addressesifnotmsg_json["port"]inself.__port_buffer:self.__port_buffer.append(msg_json["port"])# extract if any message from server and display itifmsg_json["message"]:self.__queue.append((msg_json["port"],msg_json["message"],frame))else:# append recovered unique port and frame to queueself.__queue.append((msg_json["port"],frame))# extract if any message from server if Bidirectional Mode is enabledelifself.__bi_mode:ifmsg_json["message"]:# append grouped frame and data to queueself.__queue.append((msg_json["message"],frame))else:self.__queue.append((None,frame))else:# otherwise append recovered frame to queueself.__queue.append(frame)defrecv(self,return_data=None)->Optional[NDArray]:""" A Receiver end method, that extracts received frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full. Parameters: return_data (any): inputs return data _(of any datatype)_, for sending back to Server. **Returns:** A n-dimensional numpy array. """# check whether `receive mode` is activatedifnot(self.__receive_mode):# raise value error and exitself.__terminate=TrueraiseValueError("[NetGear:ERROR] :: `recv()` function cannot be used while receive_mode is disabled. Kindly refer vidgear docs!")# handle Bidirectional return dataif(self.__bi_modeorself.__multiclient_mode)andnot(return_dataisNone):self.__return_data=return_data# check whether or not termination flag is enabledwhilenotself.__terminate:try:# check if queue is emptyiflen(self.__queue)>0:returnself.__queue.popleft()else:time.sleep(0.00001)continueexceptKeyboardInterrupt:self.__terminate=Truebreak# otherwise return NoneTypereturnNonedefsend(self,frame:NDArray,message:Any=None)->Optional[Any]:""" A Server end method, that sends the data and frames over the network to Client(s). Parameters: frame (numpy.ndarray): inputs numpy array(frame). message (any): input for sending additional data _(of any datatype except `numpy.ndarray`)_ to Client(s). **Returns:** Data _(of any datatype)_ in selected exclusive modes, otherwise None-type. """# check whether `receive_mode` is disabledifself.__receive_mode:# raise value error and exitself.__terminate=TrueraiseValueError("[NetGear:ERROR] :: `send()` function cannot be used while receive_mode is enabled. Kindly refer vidgear docs!")ifnot(messageisNone)andisinstance(message,np.ndarray):logger.warning("Skipped unsupported `message` of datatype: {}!".format(type(message).__name__))message=None# define exit_flag and assign valueexit_flag=Trueif(frameisNoneorself.__terminate)elseFalse# check whether exit_flag is Falseifnot(exit_flag)andnot(frame.flags["C_CONTIGUOUS"]):# check whether the incoming frame is contiguousframe=np.ascontiguousarray(frame,dtype=frame.dtype)# handle JPEG compression encodingifself.__jpeg_compression:ifself.__jpeg_compression_colorspace=="GRAY":ifframe.ndim==2:# patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11frame=np.expand_dims(frame,axis=2)frame=simplejpeg.encode_jpeg(frame,quality=self.__jpeg_compression_quality,colorspace=self.__jpeg_compression_colorspace,fastdct=self.__jpeg_compression_fastdct,)else:frame=simplejpeg.encode_jpeg(frame,quality=self.__jpeg_compression_quality,colorspace=self.__jpeg_compression_colorspace,colorsubsampling="422",fastdct=self.__jpeg_compression_fastdct,)# check if multiserver_mode is activated and assign values with unique portmsg_dict=dict(port=self.__port)ifself.__multiserver_modeelsedict()# prepare the exclusive json dictmsg_dict.update(dict(terminate_flag=exit_flag,compression=({"dct":self.__jpeg_compression_fastdct,"ups":self.__jpeg_compression_fastupsample,"colorspace":self.__jpeg_compression_colorspace,}ifself.__jpeg_compressionelseFalse),message=message,pattern=str(self.__pattern),dtype=str(frame.dtype)ifnot(self.__jpeg_compression)else"",shape=frame.shapeifnot(self.__jpeg_compression)else"",))# send the json dictself.__msg_socket.send_json(msg_dict,self.__msg_flag|zmq.SNDMORE)# send the frame array with correct flagsself.__msg_socket.send(frame,flags=self.__msg_flag,copy=self.__msg_copy,track=self.__msg_track)# check if synchronous patterns, then wait for confirmationifself.__pattern<2:# check if Bidirectional data transmission is enabledifself.__bi_modeorself.__multiclient_mode:# handles return datarecvd_data=Nonesocks=dict(self.__poll.poll(self.__request_timeout))ifsocks.get(self.__msg_socket)==zmq.POLLIN:# handle return datarecv_json=self.__msg_socket.recv_json(flags=self.__msg_flag)else:logger.critical("No response from Client, Reconnecting again...")# Socket is confused. Close and remove it.self.__msg_socket.setsockopt(zmq.LINGER,0)self.__msg_socket.close()self.__poll.unregister(self.__msg_socket)self.__max_retries-=1ifnot(self.__max_retries):ifself.__multiclient_mode:logger.error("All Clients failed to respond on multiple attempts.")else:logger.error("Client failed to respond on multiple attempts.")self.__terminate=TrueraiseRuntimeError("[NetGear:ERROR] :: Client(s) seems to be offline, Abandoning.")# Create new connectionself.__msg_socket=self.__msg_context.socket(self.__msg_pattern)ifisinstance(self.__connection_address,list):for_connectioninself.__connection_address:self.__msg_socket.connect(_connection)else:# handle SSH tunneling if enabledifself.__ssh_tunnel_mode:# establish tunnel connectionssh.tunnel_connection(self.__msg_socket,self.__connection_address,self.__ssh_tunnel_mode,keyfile=self.__ssh_tunnel_keyfile,password=self.__ssh_tunnel_pwd,paramiko=self.__paramiko_present,)else:# connect normallyself.__msg_socket.connect(self.__connection_address)self.__poll.register(self.__msg_socket,zmq.POLLIN)# return None for mean-timereturnNone# save the unique port addressesif(self.__multiclient_modeandnotrecv_json["port"]inself.__port_buffer):self.__port_buffer.append(recv_json["port"])ifrecv_json["return_type"]=="ndarray":recv_array=self.__msg_socket.recv(flags=self.__msg_flag,copy=self.__msg_copy,track=self.__msg_track,)# check if encoding was enabledifrecv_json["compression"]:# decode JPEG framerecvd_data=simplejpeg.decode_jpeg(recv_array,colorspace=recv_json["compression"]["colorspace"],fastdct=self.__jpeg_compression_fastdctorrecv_json["compression"]["dct"],fastupsample=self.__jpeg_compression_fastupsampleorrecv_json["compression"]["ups"],)# check if valid frame returnedifrecvd_dataisNone:self.__terminate=True# otherwise raise error and exitraiseRuntimeError("[NetGear:ERROR] :: Received compressed frame `{}` decoding failed with flag: {}.".format(recv_json["compression"],self.__ex_compression_params,))if(recv_json["compression"]["colorspace"]=="GRAY"andrecvd_data.ndim==3):# patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11recvd_data=np.squeeze(recvd_data,axis=2)else:recvd_data=np.frombuffer(recv_array,dtype=recv_json["array_dtype"]).reshape(recv_json["array_shape"])else:recvd_data=recv_json["data"]return((recv_json["port"],recvd_data)ifself.__multiclient_modeelserecvd_data)else:# otherwise log normallysocks=dict(self.__poll.poll(self.__request_timeout))ifsocks.get(self.__msg_socket)==zmq.POLLIN:recv_confirmation=self.__msg_socket.recv()else:logger.critical("No response from Client, Reconnecting again...")# Socket is confused. Close and remove it.self.__msg_socket.setsockopt(zmq.LINGER,0)self.__msg_socket.close()self.__poll.unregister(self.__msg_socket)self.__max_retries-=1ifnot(self.__max_retries):logger.error("Client failed to respond on repeated attempts.")self.__terminate=TrueraiseRuntimeError("[NetGear:ERROR] :: Client seems to be offline, Abandoning!")# Create new connectionself.__msg_socket=self.__msg_context.socket(self.__msg_pattern)# handle SSH tunneling if enabledifself.__ssh_tunnel_mode:# establish tunnel connectionssh.tunnel_connection(self.__msg_socket,self.__connection_address,self.__ssh_tunnel_mode,keyfile=self.__ssh_tunnel_keyfile,password=self.__ssh_tunnel_pwd,paramiko=self.__paramiko_present,)else:# connect normallyself.__msg_socket.connect(self.__connection_address)self.__poll.register(self.__msg_socket,zmq.POLLIN)returnNone# log confirmationself.__loggingandlogger.debug(recv_confirmation)defclose(self,kill:bool=False)->None:""" Safely terminates the threads, and NetGear resources. Parameters: kill (bool): Kills ZMQ context instead of graceful exiting in receive mode. """# log itself.__loggingandlogger.debug("Terminating various {} Processes.".format("Receive Mode"ifself.__receive_modeelse"Send Mode"))# whether `receive_mode` is enabled or notifself.__receive_mode:# check whether queue mode is emptyifnot(self.__queueisNone)andself.__queue:self.__queue.clear()# call immediate terminationself.__terminate=True# properly close the socketself.__loggingandlogger.debug("Terminating. Please wait...")# Handle Secure Mode Threadifself.__z_auth:self.__loggingandlogger.debug("Terminating Authenticator Thread.")self.__z_auth.stop()whileself.__z_auth.is_alive():pass# wait until stream resources are released# (producer thread might be still grabbing frame)ifself.__threadisnotNone:self.__loggingandlogger.debug("Terminating Main Thread.")# properly handle thread exitifself.__thread.is_alive()andkill:# force close if still alivelogger.warning("Thread still running...Killing it forcefully!")self.__msg_context.destroy()self.__thread.join()else:self.__msg_socket.close(linger=0)self.__thread.join()self.__thread=Noneself.__loggingandlogger.debug("Terminated Successfully!")else:# indicate that process should be terminatedself.__terminate=True# log if kill enabledkillandlogger.warning("`kill` parmeter is only available in the receive mode.")# Handle Secure Mode Threadifself.__z_auth:self.__loggingandlogger.debug("Terminating Authenticator Thread.")self.__z_auth.stop()whileself.__z_auth.is_alive():pass# check if all attempts of reconnecting failed, then skip to closureif(self.__pattern<2andnotself.__max_retries)or(self.__multiclient_modeandnotself.__port_buffer):try:# properly close the socketself.__msg_socket.setsockopt(zmq.LINGER,0)self.__msg_socket.close()exceptZMQError:passfinally:# exitreturnifself.__multiserver_mode:# check if multiserver_mode# send termination flag to client with its unique portterm_dict=dict(terminate_flag=True,port=self.__port)else:# otherwise send termination flag to clientterm_dict=dict(terminate_flag=True)try:ifself.__multiclient_mode:for_inself.__port_buffer:self.__msg_socket.send_json(term_dict)else:self.__msg_socket.send_json(term_dict)# check for confirmation if available within 1/5 timeoutifself.__pattern<2:self.__loggingandlogger.debug("Terminating. Please wait...")ifself.__msg_socket.poll(self.__request_timeout//5,zmq.POLLIN):self.__msg_socket.recv()exceptExceptionase:ifnotisinstance(e,ZMQError):logger.exception(str(e))finally:# properly close the socketself.__msg_socket.setsockopt(zmq.LINGER,0)self.__msg_socket.close()self.__loggingandlogger.debug("Terminated Successfully!")
def__init__(self,address:str=None,port:str=None,protocol:str=None,pattern:int=0,receive_mode:bool=False,logging:bool=False,**options:dict):""" This constructor method initializes the object state and attributes of the NetGear class. Parameters: address (str): sets the valid network address of the Server/Client. port (str): sets the valid Network Port of the Server/Client. protocol (str): sets the valid messaging protocol between Server/Client. pattern (int): sets the supported messaging pattern(flow of communication) between Server/Client receive_mode (bool): select the Netgear's Mode of operation. logging (bool): enables/disables logging. options (dict): provides the flexibility to alter various NetGear internal properties. """# enable logging if specifiedself.__logging=loggingifisinstance(logging,bool)elseFalse# print current versionlogcurr_vidgear_ver(logging=self.__logging)# raise error(s) for critical Class importsimport_dependency_safe("zmq"ifzmqisNoneelse"",min_version="4.0",pkg_name="pyzmq")import_dependency_safe("simplejpeg"ifsimplejpegisNoneelse"",error="log",min_version="1.6.1")# define valid messaging patterns => `0`: zmq.PAIR, `1`:(zmq.REQ,zmq.REP), and `1`:(zmq.SUB,zmq.PUB)valid_messaging_patterns={0:(zmq.PAIR,zmq.PAIR),1:(zmq.REQ,zmq.REP),2:(zmq.PUB,zmq.SUB),}# Handle messaging patternmsg_pattern=None# check whether user-defined messaging pattern is validifisinstance(pattern,int)andpatterninvalid_messaging_patterns.keys():# assign valuemsg_pattern=valid_messaging_patterns[pattern]else:# otherwise default to 0:`zmq.PAIR`pattern=0msg_pattern=valid_messaging_patterns[pattern]self.__loggingandlogger.warning("Wrong pattern value, Defaulting to `zmq.PAIR`! Kindly refer Docs for more Information.")# assign pattern to global parameter for further useself.__pattern=pattern# Handle messaging protocolifprotocolisNoneornot(protocolin["tcp","ipc"]):# else default to `tcp` protocolprotocol="tcp"# log itself.__loggingandlogger.warning("Protocol is not supported or not provided. Defaulting to `tcp` protocol!")# Handle connection paramsself.__msg_flag=0# handles connection flagsself.__msg_copy=False# handles whether to copy dataself.__msg_track=False# handles whether to track packets# Handle NetGear's internal exclusive modes and params# define Secure Modeself.__z_auth=None# define SSH Tunneling Modeself.__ssh_tunnel_mode=None# handles ssh_tunneling mode stateself.__ssh_tunnel_pwd=Noneself.__ssh_tunnel_keyfile=Noneself.__paramiko_present=FalseifparamikoisNoneelseTrue# define Multi-Server modeself.__multiserver_mode=False# handles multi-server mode state# define Multi-Client modeself.__multiclient_mode=False# handles multi-client mode state# define Bidirectional modeself.__bi_mode=False# handles Bidirectional mode state# define Secure modevalid_security_mech={0:"Grasslands",1:"StoneHouse",2:"IronHouse"}self.__secure_mode=0# handles ZMQ security layer statusauth_cert_dir=""# handles valid ZMQ certificates dirself.__auth_publickeys_dir=""# handles valid ZMQ public certificates dirself.__auth_secretkeys_dir=""# handles valid ZMQ private certificates diroverwrite_cert=False# checks if certificates overwriting allowedcustom_cert_location=""# handles custom ZMQ certificates path# define frame-compression handlerself.__jpeg_compression=(Trueifnot(simplejpegisNone)elseFalse)# enabled by default for all connections if simplejpeg is installedself.__jpeg_compression_quality=90# 90% qualityself.__jpeg_compression_fastdct=True# fastest DCT on by defaultself.__jpeg_compression_fastupsample=False# fastupsample off by defaultself.__jpeg_compression_colorspace="BGR"# use BGR colorspace by default# defines frame compression on return dataself.__ex_compression_params=None# define receiver return data handlerself.__return_data=None# generate 8-digit random system idself.__id="".join(secrets.choice(string.ascii_uppercase+string.digits)foriinrange(8))# define termination flagself.__terminate=False# additional settings for reliabilityifpattern<2:# define zmq poller for reliable transmissionself.__poll=zmq.Poller()# define max retriesself.__max_retries=3# request timeoutself.__request_timeout=4000# 4 secselse:# subscriber timeoutself.__subscriber_timeout=None# Handle user-defined options dictionary values# reformat dictionaryoptions={str(k).strip():vfork,vinoptions.items()}# loop over dictionary key & values and assign to global variables if validforkey,valueinoptions.items():# handle multi-server modeifkey=="multiserver_mode"andisinstance(value,bool):# check if valid pattern assignedifpattern>0:# activate Multi-server modeself.__multiserver_mode=valueelse:# otherwise disable it and raise errorself.__multiserver_mode=Falselogger.critical("Multi-Server Mode is disabled!")raiseValueError("[NetGear:ERROR] :: `{}` pattern is not valid when Multi-Server Mode is enabled. Kindly refer Docs for more Information.".format(pattern))# handle multi-client modeelifkey=="multiclient_mode"andisinstance(value,bool):# check if valid pattern assignedifpattern>0:# activate Multi-client modeself.__multiclient_mode=valueelse:# otherwise disable it and raise errorself.__multiclient_mode=Falselogger.critical("Multi-Client Mode is disabled!")raiseValueError("[NetGear:ERROR] :: `{}` pattern is not valid when Multi-Client Mode is enabled. Kindly refer Docs for more Information.".format(pattern))# handle bidirectional modeelifkey=="bidirectional_mode"andisinstance(value,bool):# check if pattern is validifpattern<2:# activate Bidirectional mode if specifiedself.__bi_mode=valueelse:# otherwise disable it and raise errorself.__bi_mode=Falselogger.warning("Bidirectional data transmission is disabled!")raiseValueError("[NetGear:ERROR] :: `{}` pattern is not valid when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(pattern))# handle secure modeelif(key=="secure_mode"andisinstance(value,int)and(valueinvalid_security_mech)):self.__secure_mode=valueelifkey=="custom_cert_location"andisinstance(value,str):# verify custom auth certificates path for secure modecustom_cert_location=os.path.abspath(value)assertos.path.isdir(custom_cert_location),"[NetGear:ERROR] :: `custom_cert_location` value must be the path to a valid directory!"assertcheck_WriteAccess(custom_cert_location,is_windows=Trueifos.name=="nt"elseFalse,logging=self.__logging,),"[NetGear:ERROR] :: Permission Denied!, cannot write ZMQ authentication certificates to '{}' directory!".format(value)elifkey=="overwrite_cert"andisinstance(value,bool):# enable/disable auth certificate overwriting in secure modeoverwrite_cert=value# handle ssh-tunneling modeelifkey=="ssh_tunnel_mode"andisinstance(value,str):# enable SSH Tunneling Modeself.__ssh_tunnel_mode=value.strip()elifkey=="ssh_tunnel_pwd"andisinstance(value,str):# add valid SSH Tunneling passwordself.__ssh_tunnel_pwd=valueelifkey=="ssh_tunnel_keyfile"andisinstance(value,str):# add valid SSH Tunneling key-fileself.__ssh_tunnel_keyfile=valueifos.path.isfile(value)elseNoneifself.__ssh_tunnel_keyfileisNone:logger.warning("Discarded invalid or non-existential SSH Tunnel Key-file at {}!".format(value))# handle jpeg compressionelif(key=="jpeg_compression"andnot(simplejpegisNone)andisinstance(value,(bool,str))):ifisinstance(value,str)andvalue.strip().upper()in["RGB","BGR","RGBX","BGRX","XBGR","XRGB","GRAY","RGBA","BGRA","ABGR","ARGB","CMYK",]:# set encoding colorspaceself.__jpeg_compression_colorspace=value.strip().upper()# enable frame-compression encoding valueself.__jpeg_compression=Trueelse:# enable frame-compression encoding valueself.__jpeg_compression=valueelifkey=="jpeg_compression_quality"andisinstance(value,(int,float)):# set valid jpeg qualityifvalue>=10andvalue<=100:self.__jpeg_compression_quality=int(value)else:logger.warning("Skipped invalid `jpeg_compression_quality` value!")elifkey=="jpeg_compression_fastdct"andisinstance(value,bool):# enable jpeg fastdctself.__jpeg_compression_fastdct=valueelifkey=="jpeg_compression_fastupsample"andisinstance(value,bool):# enable jpeg fastupsampleself.__jpeg_compression_fastupsample=value# assign maximum retries in synchronous patternselifkey=="max_retries"andisinstance(value,int)andpattern<2:ifvalue>=0:self.__max_retries=valueelse:logger.warning("Invalid `max_retries` value skipped!")# assign request timeout in synchronous patternselifkey=="request_timeout"andisinstance(value,int)andpattern<2:ifvalue>=4:self.__request_timeout=value*1000# covert to millisecondselse:logger.warning("Invalid `request_timeout` value skipped!")# assign subscriber timeoutelif(key=="subscriber_timeout"andisinstance(value,int)andpattern==2):ifvalue>0:self.__subscriber_timeout=value*1000# covert to millisecondselse:logger.warning("Invalid `request_timeout` value skipped!")# handle ZMQ flagselifkey=="flag"andisinstance(value,int):self.__msg_flag=valueself.__msg_flagandlogger.warning("The flag optional value is set to `1` (NOBLOCK) for this run. This might cause NetGear to not terminate gracefully.")elifkey=="copy"andisinstance(value,bool):self.__msg_copy=valueelifkey=="track"andisinstance(value,bool):self.__msg_track=valueself.__msg_copyandself.__msg_trackandlogger.info("The `track` optional value will be ignored for this run because `copy=True` is also defined.")else:pass# Handle ssh tunneling if enabledifnot(self.__ssh_tunnel_modeisNone):# SSH Tunnel Mode only available for server modeifreceive_mode:logger.error("SSH Tunneling cannot be enabled for Client-end!")else:# check if SSH tunneling possiblessh_address=self.__ssh_tunnel_modessh_address,ssh_port=(ssh_address.split(":")if":"inssh_addresselse[ssh_address,"22"])# default to port 22if"47"inssh_port:self.__ssh_tunnel_mode=self.__ssh_tunnel_mode.replace(":47","")# port-47 is reserved for testingelse:# extract ip for validationssh_user,ssh_ip=(ssh_address.split("@")if"@"inssh_addresselse["",ssh_address])# validate ip specified portassertcheck_open_port(ssh_ip,port=int(ssh_port)),"[NetGear:ERROR] :: Host `{}` is not available for SSH Tunneling at port-{}!".format(ssh_address,ssh_port)# Handle multiple exclusive modes if enabledifself.__multiclient_modeandself.__multiserver_mode:raiseValueError("[NetGear:ERROR] :: Multi-Client and Multi-Server Mode cannot be enabled simultaneously!")elifself.__multiserver_modeorself.__multiclient_mode:# check if Bidirectional Mode also enabledifself.__bi_mode:# log itself.__loggingandlogger.debug("Bidirectional Data Transmission is also enabled for this connection!")# check if SSH Tunneling Mode also enabledifself.__ssh_tunnel_mode:# raise errorraiseValueError("[NetGear:ERROR] :: SSH Tunneling and {} Mode cannot be enabled simultaneously. Kindly refer docs!".format("Multi-Server"ifself.__multiserver_modeelse"Multi-Client"))elifself.__bi_mode:# log Bidirectional mode activationself.__loggingandlogger.debug("Bidirectional Data Transmission is enabled for this connection!")elifself.__ssh_tunnel_mode:# log Bidirectional mode activationself.__loggingandlogger.debug("SSH Tunneling is enabled for host:`{}` with `{}` back-end.".format(self.__ssh_tunnel_mode,"paramiko"ifself.__paramiko_presentelse"pexpect",))# On Windows, NetGear requires the ``WindowsSelectorEventLoop`` but Python 3.8 and above,# defaults to an ``ProactorEventLoop`` loop that is not compatible with it. Thereby,# we had to set it manually.platform.system()=="Windows"andasyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())# define ZMQ messaging context instanceself.__msg_context=zmq.Context.instance()# initialize and assign receive mode to global variableself.__receive_mode=receive_mode# Handle Secure modeifself.__secure_mode>0:# activate and log if overwriting is enabledifreceive_mode:overwrite_cert=Falseoverwrite_certandlogger.warning("Overwriting ZMQ Authentication certificates is disabled for Client's end!")else:overwrite_certandself.__loggingandlogger.info("Overwriting ZMQ Authentication certificates over previous ones!")# Validate certificate generation paths# Start threaded authenticator for this contexttry:# check if custom certificates path is specifiedifcustom_cert_location:(auth_cert_dir,self.__auth_secretkeys_dir,self.__auth_publickeys_dir,)=generate_auth_certificates(custom_cert_location,overwrite=overwrite_cert,logging=logging)else:# otherwise auto-generate suitable path(auth_cert_dir,self.__auth_secretkeys_dir,self.__auth_publickeys_dir,)=generate_auth_certificates(os.path.join(expanduser("~"),".vidgear"),overwrite=overwrite_cert,logging=logging,)# log itself.__loggingandlogger.debug("`{}` is the default location for storing ZMQ authentication certificates/keys.".format(auth_cert_dir))# start an authenticator for this contextself.__z_auth=ThreadAuthenticator(self.__msg_context)self.__z_auth.start()self.__z_auth.allow(str(address))# allow current address# check if `IronHouse` is activatedifself.__secure_mode==2:# tell authenticator to use the certificate from given valid dirself.__z_auth.configure_curve(domain="*",location=self.__auth_publickeys_dir)else:# otherwise tell the authenticator how to handle the CURVE requests, if `StoneHouse` is activatedself.__z_auth.configure_curve(domain="*",location=auth.CURVE_ALLOW_ANY)exceptzmq.ZMQErrorase:if"Address in use"instr(e):logger.info("ZMQ Authenticator already running.")else:# catch if any error occurred and disable Secure modelogger.exception(str(e))self.__secure_mode=0logger.error("ZMQ Security Mechanism is disabled for this connection due to errors!")# check whether `receive_mode` is enabledifself.__receive_mode:# define connection addressaddress="*"ifaddressisNoneelseaddress# check if multiserver_mode is enabledifself.__multiserver_mode:# check if unique server port address list/tuple is assigned or not in multiserver_modeifportisNoneornotisinstance(port,(tuple,list)):# raise error if notraiseValueError("[NetGear:ERROR] :: Incorrect port value! Kindly provide a list/tuple of Server ports while Multi-Server mode is enabled. For more information refer VidGear docs.")else:# otherwise log itlogger.debug("Enabling Multi-Server Mode at PORTS: {}!".format(port))# create port address buffer for keeping track of connected client's port(s)self.__port_buffer=[]# check if multiclient_mode is enabledelifself.__multiclient_mode:# check if unique server port address is assigned or not in multiclient_modeifportisNone:# raise error if notraiseValueError("[NetGear:ERROR] :: Kindly provide a unique & valid port value at Client-end. For more information refer VidGear docs.")else:# otherwise log itlogger.debug("Enabling Multi-Client Mode at PORT: {} on this device!".format(port))# assign value to global variableself.__port=portelse:# otherwise assign local port address if Noneport="5555"ifportisNoneelseporttry:# define thread-safe messaging socketself.__msg_socket=self.__msg_context.socket(msg_pattern[1])# define pub-sub flagself.__pattern==2andself.__msg_socket.set_hwm(1)# enable specified secure mode for the socketifself.__secure_mode>0:# load server keyserver_secret_file=os.path.join(self.__auth_secretkeys_dir,"server.key_secret")server_public,server_secret=auth.load_certificate(server_secret_file)# load all CURVE keysself.__msg_socket.curve_secretkey=server_secretself.__msg_socket.curve_publickey=server_public# enable CURVE connection for this socketself.__msg_socket.curve_server=True# define exclusive socket options for `patterns=2`ifself.__pattern==2:self.__msg_socket.setsockopt_string(zmq.SUBSCRIBE,"")self.__subscriber_timeoutandself.__msg_socket.setsockopt(zmq.RCVTIMEO,self.__subscriber_timeout)self.__subscriber_timeoutandself.__msg_socket.setsockopt(zmq.LINGER,0)# if multiserver_mode is enabled, then assign port addresses to zmq socketifself.__multiserver_mode:# bind socket to given server protocol, address and portsforptinport:self.__msg_socket.bind(protocol+"://"+str(address)+":"+str(pt))else:# bind socket to given protocol, address and port normallyself.__msg_socket.bind(protocol+"://"+str(address)+":"+str(port))# additional settingsifpattern<2:ifself.__multiserver_mode:self.__connection_address=[]forptinport:self.__connection_address.append(protocol+"://"+str(address)+":"+str(pt))else:self.__connection_address=(protocol+"://"+str(address)+":"+str(port))self.__msg_pattern=msg_pattern[1]self.__poll.register(self.__msg_socket,zmq.POLLIN)self.__loggingandlogger.debug("Reliable transmission is enabled for this pattern with max-retries: {} and timeout: {} secs.".format(self.__max_retries,self.__request_timeout/1000))else:self.__loggingandself.__subscriber_timeoutandlogger.debug("Timeout: {} secs is enabled for this system.".format(self.__subscriber_timeout/1000))exceptExceptionase:# otherwise log and raise errorlogger.exception(str(e))# Handle Secure Modeself.__secure_modeandlogger.critical("Failed to activate Secure Mode: `{}` for this connection!".format(valid_security_mech[self.__secure_mode]))# raise errors for exclusive modesifself.__multiserver_modeorself.__multiclient_mode:raiseRuntimeError("[NetGear:ERROR] :: Receive Mode failed to activate {} Mode at address: {} with pattern: {}! Kindly recheck all parameters.".format(("Multi-Server"ifself.__multiserver_modeelse"Multi-Client"),(protocol+"://"+str(address)+":"+str(port)),pattern,))else:self.__bi_modeandlogger.critical("Failed to activate Bidirectional Mode for this connection!")raiseRuntimeError("[NetGear:ERROR] :: Receive Mode failed to bind address: {} and pattern: {}! Kindly recheck all parameters.".format((protocol+"://"+str(address)+":"+str(port)),pattern))# Handle threaded queue modeself.__loggingandlogger.debug("Threaded Queue Mode is enabled by default for this connection.")# define deque and assign it to global varself.__queue=deque(maxlen=96)# max len 96 to check overflow# initialize and start threaded recv_handlerself.__thread=Thread(target=self.__recv_handler,name="NetGear",args=())self.__thread.daemon=Trueself.__thread.start()ifself.__logging:# finally log progresslogger.debug("Successfully Binded to address: {} with pattern: {}.".format((protocol+"://"+str(address)+":"+str(port)),pattern))self.__jpeg_compressionandlogger.debug("JPEG Frame-Compression is activated for this connection with Colorspace:`{}`, Quality:`{}`%, Fastdct:`{}`, and Fastupsample:`{}`.".format(self.__jpeg_compression_colorspace,self.__jpeg_compression_quality,("enabled"ifself.__jpeg_compression_fastdctelse"disabled"),("enabled"ifself.__jpeg_compression_fastupsampleelse"disabled"),))self.__secure_modeandlogger.debug("Successfully enabled ZMQ Security Mechanism: `{}` for this connection.".format(valid_security_mech[self.__secure_mode]))logger.debug("Multi-threaded Receive Mode is successfully enabled.")logger.debug("Unique System ID is {}.".format(self.__id))logger.debug("Receive Mode is now activated.")else:# otherwise default to `Send Mode`# define connection addressaddress="localhost"ifaddressisNoneelseaddress# check if multiserver_mode is enabledifself.__multiserver_mode:# check if unique server port address is assigned or not in multiserver_modeifportisNone:# raise error if notraiseValueError("[NetGear:ERROR] :: Kindly provide a unique & valid port value at Server-end. For more information refer VidGear docs.")else:# otherwise log itlogger.debug("Enabling Multi-Server Mode at PORT: {} on this device!".format(port))# assign value to global variableself.__port=port# check if multiclient_mode is enabledelifself.__multiclient_mode:# check if unique client port address list/tuple is assigned or not in multiclient_modeifportisNoneornotisinstance(port,(tuple,list)):# raise error if notraiseValueError("[NetGear:ERROR] :: Incorrect port value! Kindly provide a list/tuple of Client ports while Multi-Client mode is enabled. For more information refer VidGear docs.")else:# otherwise log itlogger.debug("Enabling Multi-Client Mode at PORTS: {}!".format(port))# create port address buffer for keeping track of connected client portsself.__port_buffer=[]else:# otherwise assign local port address if Noneport="5555"ifportisNoneelseporttry:# define thread-safe messaging socketself.__msg_socket=self.__msg_context.socket(msg_pattern[0])# if req/rep pattern, define additional flagsifself.__pattern==1:self.__msg_socket.REQ_RELAXED=Trueself.__msg_socket.REQ_CORRELATE=True# if pub/sub pattern, define additional optimizerifself.__pattern==2:self.__msg_socket.set_hwm(1)# enable specified secure mode for the socketifself.__secure_mode>0:# load client keyclient_secret_file=os.path.join(self.__auth_secretkeys_dir,"client.key_secret")client_public,client_secret=auth.load_certificate(client_secret_file)# load all CURVE keysself.__msg_socket.curve_secretkey=client_secretself.__msg_socket.curve_publickey=client_public# load server keyserver_public_file=os.path.join(self.__auth_publickeys_dir,"server.key")server_public,_=auth.load_certificate(server_public_file)# inject public key to make a CURVE connection.self.__msg_socket.curve_serverkey=server_public# check if multi-client_mode is enabledifself.__multiclient_mode:# bind socket to given server protocol, address and portsforptinport:self.__msg_socket.connect(protocol+"://"+str(address)+":"+str(pt))else:# handle SSH tunneling if enabledifself.__ssh_tunnel_mode:# establish tunnel connectionssh.tunnel_connection(self.__msg_socket,protocol+"://"+str(address)+":"+str(port),self.__ssh_tunnel_mode,keyfile=self.__ssh_tunnel_keyfile,password=self.__ssh_tunnel_pwd,paramiko=self.__paramiko_present,)else:# connect socket to given protocol, address and portself.__msg_socket.connect(protocol+"://"+str(address)+":"+str(port))# additional settingsifpattern<2:ifself.__multiclient_mode:self.__connection_address=[]forptinport:self.__connection_address.append(protocol+"://"+str(address)+":"+str(pt))else:self.__connection_address=(protocol+"://"+str(address)+":"+str(port))self.__msg_pattern=msg_pattern[0]self.__poll.register(self.__msg_socket,zmq.POLLIN)self.__loggingandlogger.debug("Reliable transmission is enabled for this pattern with max-retries: {} and timeout: {} secs.".format(self.__max_retries,self.__request_timeout/1000))exceptExceptionase:# otherwise log and raise errorlogger.exception(str(e))# Handle Secure Modeself.__secure_modeandlogger.critical("Failed to activate Secure Mode: `{}` for this connection!".format(valid_security_mech[self.__secure_mode]))# raise errors for exclusive modesifself.__multiserver_modeorself.__multiclient_mode:raiseRuntimeError("[NetGear:ERROR] :: Send Mode failed to activate {} Mode at address: {} with pattern: {}! Kindly recheck all parameters.".format(("Multi-Server"ifself.__multiserver_modeelse"Multi-Client"),(protocol+"://"+str(address)+":"+str(port)),pattern,))else:self.__bi_modeandlogger.critical("Failed to activate Bidirectional Mode for this connection!")self.__ssh_tunnel_modeandlogger.critical("Failed to initiate SSH Tunneling Mode for this server with `{}` back-end!".format("paramiko"ifself.__paramiko_presentelse"pexpect"))raiseRuntimeError("[NetGear:ERROR] :: Send Mode failed to connect address: {} and pattern: {}! Kindly recheck all parameters.".format((protocol+"://"+str(address)+":"+str(port)),pattern))ifself.__logging:# finally log progresslogger.debug("Successfully connected to address: {} with pattern: {}.".format((protocol+"://"+str(address)+":"+str(port)),pattern))self.__jpeg_compressionandlogger.debug("JPEG Frame-Compression is activated for this connection with Colorspace:`{}`, Quality:`{}`%, Fastdct:`{}`, and Fastupsample:`{}`.".format(self.__jpeg_compression_colorspace,self.__jpeg_compression_quality,("enabled"ifself.__jpeg_compression_fastdctelse"disabled"),("enabled"ifself.__jpeg_compression_fastupsampleelse"disabled"),))self.__secure_modeandlogger.debug("Enabled ZMQ Security Mechanism: `{}` for this connection.".format(valid_security_mech[self.__secure_mode]))logger.debug("Unique System ID is {}.".format(self.__id))logger.debug("Send Mode is successfully activated and ready to send data.")
defclose(self,kill:bool=False)->None:""" Safely terminates the threads, and NetGear resources. Parameters: kill (bool): Kills ZMQ context instead of graceful exiting in receive mode. """# log itself.__loggingandlogger.debug("Terminating various {} Processes.".format("Receive Mode"ifself.__receive_modeelse"Send Mode"))# whether `receive_mode` is enabled or notifself.__receive_mode:# check whether queue mode is emptyifnot(self.__queueisNone)andself.__queue:self.__queue.clear()# call immediate terminationself.__terminate=True# properly close the socketself.__loggingandlogger.debug("Terminating. Please wait...")# Handle Secure Mode Threadifself.__z_auth:self.__loggingandlogger.debug("Terminating Authenticator Thread.")self.__z_auth.stop()whileself.__z_auth.is_alive():pass# wait until stream resources are released# (producer thread might be still grabbing frame)ifself.__threadisnotNone:self.__loggingandlogger.debug("Terminating Main Thread.")# properly handle thread exitifself.__thread.is_alive()andkill:# force close if still alivelogger.warning("Thread still running...Killing it forcefully!")self.__msg_context.destroy()self.__thread.join()else:self.__msg_socket.close(linger=0)self.__thread.join()self.__thread=Noneself.__loggingandlogger.debug("Terminated Successfully!")else:# indicate that process should be terminatedself.__terminate=True# log if kill enabledkillandlogger.warning("`kill` parmeter is only available in the receive mode.")# Handle Secure Mode Threadifself.__z_auth:self.__loggingandlogger.debug("Terminating Authenticator Thread.")self.__z_auth.stop()whileself.__z_auth.is_alive():pass# check if all attempts of reconnecting failed, then skip to closureif(self.__pattern<2andnotself.__max_retries)or(self.__multiclient_modeandnotself.__port_buffer):try:# properly close the socketself.__msg_socket.setsockopt(zmq.LINGER,0)self.__msg_socket.close()exceptZMQError:passfinally:# exitreturnifself.__multiserver_mode:# check if multiserver_mode# send termination flag to client with its unique portterm_dict=dict(terminate_flag=True,port=self.__port)else:# otherwise send termination flag to clientterm_dict=dict(terminate_flag=True)try:ifself.__multiclient_mode:for_inself.__port_buffer:self.__msg_socket.send_json(term_dict)else:self.__msg_socket.send_json(term_dict)# check for confirmation if available within 1/5 timeoutifself.__pattern<2:self.__loggingandlogger.debug("Terminating. Please wait...")ifself.__msg_socket.poll(self.__request_timeout//5,zmq.POLLIN):self.__msg_socket.recv()exceptExceptionase:ifnotisinstance(e,ZMQError):logger.exception(str(e))finally:# properly close the socketself.__msg_socket.setsockopt(zmq.LINGER,0)self.__msg_socket.close()self.__loggingandlogger.debug("Terminated Successfully!")
A Receiver end method, that extracts received frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full.
Parameters:
Name
Type
Description
Default
return_data
any
inputs return data (of any datatype), for sending back to Server.
defrecv(self,return_data=None)->Optional[NDArray]:""" A Receiver end method, that extracts received frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full. Parameters: return_data (any): inputs return data _(of any datatype)_, for sending back to Server. **Returns:** A n-dimensional numpy array. """# check whether `receive mode` is activatedifnot(self.__receive_mode):# raise value error and exitself.__terminate=TrueraiseValueError("[NetGear:ERROR] :: `recv()` function cannot be used while receive_mode is disabled. Kindly refer vidgear docs!")# handle Bidirectional return dataif(self.__bi_modeorself.__multiclient_mode)andnot(return_dataisNone):self.__return_data=return_data# check whether or not termination flag is enabledwhilenotself.__terminate:try:# check if queue is emptyiflen(self.__queue)>0:returnself.__queue.popleft()else:time.sleep(0.00001)continueexceptKeyboardInterrupt:self.__terminate=Truebreak# otherwise return NoneTypereturnNone
defsend(self,frame:NDArray,message:Any=None)->Optional[Any]:""" A Server end method, that sends the data and frames over the network to Client(s). Parameters: frame (numpy.ndarray): inputs numpy array(frame). message (any): input for sending additional data _(of any datatype except `numpy.ndarray`)_ to Client(s). **Returns:** Data _(of any datatype)_ in selected exclusive modes, otherwise None-type. """# check whether `receive_mode` is disabledifself.__receive_mode:# raise value error and exitself.__terminate=TrueraiseValueError("[NetGear:ERROR] :: `send()` function cannot be used while receive_mode is enabled. Kindly refer vidgear docs!")ifnot(messageisNone)andisinstance(message,np.ndarray):logger.warning("Skipped unsupported `message` of datatype: {}!".format(type(message).__name__))message=None# define exit_flag and assign valueexit_flag=Trueif(frameisNoneorself.__terminate)elseFalse# check whether exit_flag is Falseifnot(exit_flag)andnot(frame.flags["C_CONTIGUOUS"]):# check whether the incoming frame is contiguousframe=np.ascontiguousarray(frame,dtype=frame.dtype)# handle JPEG compression encodingifself.__jpeg_compression:ifself.__jpeg_compression_colorspace=="GRAY":ifframe.ndim==2:# patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11frame=np.expand_dims(frame,axis=2)frame=simplejpeg.encode_jpeg(frame,quality=self.__jpeg_compression_quality,colorspace=self.__jpeg_compression_colorspace,fastdct=self.__jpeg_compression_fastdct,)else:frame=simplejpeg.encode_jpeg(frame,quality=self.__jpeg_compression_quality,colorspace=self.__jpeg_compression_colorspace,colorsubsampling="422",fastdct=self.__jpeg_compression_fastdct,)# check if multiserver_mode is activated and assign values with unique portmsg_dict=dict(port=self.__port)ifself.__multiserver_modeelsedict()# prepare the exclusive json dictmsg_dict.update(dict(terminate_flag=exit_flag,compression=({"dct":self.__jpeg_compression_fastdct,"ups":self.__jpeg_compression_fastupsample,"colorspace":self.__jpeg_compression_colorspace,}ifself.__jpeg_compressionelseFalse),message=message,pattern=str(self.__pattern),dtype=str(frame.dtype)ifnot(self.__jpeg_compression)else"",shape=frame.shapeifnot(self.__jpeg_compression)else"",))# send the json dictself.__msg_socket.send_json(msg_dict,self.__msg_flag|zmq.SNDMORE)# send the frame array with correct flagsself.__msg_socket.send(frame,flags=self.__msg_flag,copy=self.__msg_copy,track=self.__msg_track)# check if synchronous patterns, then wait for confirmationifself.__pattern<2:# check if Bidirectional data transmission is enabledifself.__bi_modeorself.__multiclient_mode:# handles return datarecvd_data=Nonesocks=dict(self.__poll.poll(self.__request_timeout))ifsocks.get(self.__msg_socket)==zmq.POLLIN:# handle return datarecv_json=self.__msg_socket.recv_json(flags=self.__msg_flag)else:logger.critical("No response from Client, Reconnecting again...")# Socket is confused. Close and remove it.self.__msg_socket.setsockopt(zmq.LINGER,0)self.__msg_socket.close()self.__poll.unregister(self.__msg_socket)self.__max_retries-=1ifnot(self.__max_retries):ifself.__multiclient_mode:logger.error("All Clients failed to respond on multiple attempts.")else:logger.error("Client failed to respond on multiple attempts.")self.__terminate=TrueraiseRuntimeError("[NetGear:ERROR] :: Client(s) seems to be offline, Abandoning.")# Create new connectionself.__msg_socket=self.__msg_context.socket(self.__msg_pattern)ifisinstance(self.__connection_address,list):for_connectioninself.__connection_address:self.__msg_socket.connect(_connection)else:# handle SSH tunneling if enabledifself.__ssh_tunnel_mode:# establish tunnel connectionssh.tunnel_connection(self.__msg_socket,self.__connection_address,self.__ssh_tunnel_mode,keyfile=self.__ssh_tunnel_keyfile,password=self.__ssh_tunnel_pwd,paramiko=self.__paramiko_present,)else:# connect normallyself.__msg_socket.connect(self.__connection_address)self.__poll.register(self.__msg_socket,zmq.POLLIN)# return None for mean-timereturnNone# save the unique port addressesif(self.__multiclient_modeandnotrecv_json["port"]inself.__port_buffer):self.__port_buffer.append(recv_json["port"])ifrecv_json["return_type"]=="ndarray":recv_array=self.__msg_socket.recv(flags=self.__msg_flag,copy=self.__msg_copy,track=self.__msg_track,)# check if encoding was enabledifrecv_json["compression"]:# decode JPEG framerecvd_data=simplejpeg.decode_jpeg(recv_array,colorspace=recv_json["compression"]["colorspace"],fastdct=self.__jpeg_compression_fastdctorrecv_json["compression"]["dct"],fastupsample=self.__jpeg_compression_fastupsampleorrecv_json["compression"]["ups"],)# check if valid frame returnedifrecvd_dataisNone:self.__terminate=True# otherwise raise error and exitraiseRuntimeError("[NetGear:ERROR] :: Received compressed frame `{}` decoding failed with flag: {}.".format(recv_json["compression"],self.__ex_compression_params,))if(recv_json["compression"]["colorspace"]=="GRAY"andrecvd_data.ndim==3):# patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11recvd_data=np.squeeze(recvd_data,axis=2)else:recvd_data=np.frombuffer(recv_array,dtype=recv_json["array_dtype"]).reshape(recv_json["array_shape"])else:recvd_data=recv_json["data"]return((recv_json["port"],recvd_data)ifself.__multiclient_modeelserecvd_data)else:# otherwise log normallysocks=dict(self.__poll.poll(self.__request_timeout))ifsocks.get(self.__msg_socket)==zmq.POLLIN:recv_confirmation=self.__msg_socket.recv()else:logger.critical("No response from Client, Reconnecting again...")# Socket is confused. Close and remove it.self.__msg_socket.setsockopt(zmq.LINGER,0)self.__msg_socket.close()self.__poll.unregister(self.__msg_socket)self.__max_retries-=1ifnot(self.__max_retries):logger.error("Client failed to respond on repeated attempts.")self.__terminate=TrueraiseRuntimeError("[NetGear:ERROR] :: Client seems to be offline, Abandoning!")# Create new connectionself.__msg_socket=self.__msg_context.socket(self.__msg_pattern)# handle SSH tunneling if enabledifself.__ssh_tunnel_mode:# establish tunnel connectionssh.tunnel_connection(self.__msg_socket,self.__connection_address,self.__ssh_tunnel_mode,keyfile=self.__ssh_tunnel_keyfile,password=self.__ssh_tunnel_pwd,paramiko=self.__paramiko_present,)else:# connect normallyself.__msg_socket.connect(self.__connection_address)self.__poll.register(self.__msg_socket,zmq.POLLIN)returnNone# log confirmationself.__loggingandlogger.debug(recv_confirmation)