Skip to content

NetGear_Async API References

NetGear_Async API usage examples can be found here ➶

NetGear_Async API parameters are explained here ➶

NetGear_Async can generate the same performance as NetGear API at about one-third the memory consumption, and also provide complete server-client handling with various options to use variable protocols/patterns similar to NetGear, but lacks in term of flexibility as it supports only a few NetGear's Exclusive Modes.

NetGear_Async is built on zmq.asyncio, and powered by a high-performance asyncio event loop called uvloop to achieve unwatchable high-speed and lag-free video streaming over the network with minimal resource constraints. NetGear_Async can transfer thousands of frames in just a few seconds without causing any significant load on your system.

NetGear_Async provides complete server-client handling and options to use variable protocols/patterns similar to NetGear API. Furthermore, NetGear_Async allows us to define our custom Server as source to transform frames easily before sending them across the network.

NetGear_Async now supports additional bidirectional data transmission between receiver(client) and sender(server) while transferring frames. Users can easily build complex applications such as like Real-Time Video Chat in just few lines of code.

In addition to all this, NetGear_Async API also provides internal wrapper around VideoGear, which itself provides internal access to both CamGear and PiGear APIs, thereby granting it exclusive power for transferring frames incoming from any source to the network.

NetGear_Async as of now supports four ZeroMQ messaging patterns:

  • zmq.PAIR (ZMQ Pair Pattern)
  • zmq.REQ/zmq.REP (ZMQ Request/Reply Pattern)
  • zmq.PUB/zmq.SUB (ZMQ Publish/Subscribe Pattern)
  • zmq.PUSH/zmq.PULL (ZMQ Push/Pull Pattern)

Whereas supported protocol are: tcp and ipc.

Source code in vidgear/gears/asyncio/netgear_async.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
class NetGear_Async:
    """
    NetGear_Async can generate the same performance as NetGear API at about one-third the memory consumption, and also provide complete server-client handling with various
    options to use variable protocols/patterns similar to NetGear, but lacks in term of flexibility as it supports only a few NetGear's Exclusive Modes.

    NetGear_Async is built on `zmq.asyncio`, and powered by a high-performance asyncio event loop called uvloop to achieve unwatchable high-speed and lag-free video streaming
    over the network with minimal resource constraints. NetGear_Async can transfer thousands of frames in just a few seconds without causing any significant load on your
    system.

    NetGear_Async provides complete server-client handling and options to use variable protocols/patterns similar to NetGear API. Furthermore, NetGear_Async allows us to define
     our custom Server as source to transform frames easily before sending them across the network.

    NetGear_Async now supports additional **bidirectional data transmission** between receiver(client) and sender(server) while transferring frames.
    Users can easily build complex applications such as like _Real-Time Video Chat_ in just few lines of code.

    In addition to all this, NetGear_Async API also provides internal wrapper around VideoGear, which itself provides internal access to both CamGear and PiGear APIs, thereby
    granting it exclusive power for transferring frames incoming from any source to the network.

    NetGear_Async as of now supports four ZeroMQ messaging patterns:

    - `zmq.PAIR` _(ZMQ Pair Pattern)_
    - `zmq.REQ/zmq.REP` _(ZMQ Request/Reply Pattern)_
    - `zmq.PUB/zmq.SUB` _(ZMQ Publish/Subscribe Pattern)_
    - `zmq.PUSH/zmq.PULL` _(ZMQ Push/Pull Pattern)_

    Whereas supported protocol are: `tcp` and `ipc`.
    """

    def __init__(
        self,
        # NetGear_Async parameters
        address: str = None,
        port: str = None,
        protocol: str = "tcp",
        pattern: int = 0,
        receive_mode: bool = False,
        timeout: Union[int, float] = 0.0,
        # Videogear parameters
        enablePiCamera: bool = False,
        stabilize: bool = False,
        source: Any = None,
        camera_num: int = 0,
        stream_mode: bool = False,
        backend: int = 0,
        colorspace: str = None,
        resolution: Tuple[int, int] = (640, 480),
        framerate: Union[int, float] = 25,
        time_delay: int = 0,
        # common parameters
        logging: bool = False,
        **options: dict
    ):
        """
        This constructor method initializes the object state and attributes of the NetGear_Async class.

        Parameters:
            address (str): sets the valid network address of the Server/Client.
            port (str): sets the valid Network Port of the Server/Client.
            protocol (str): sets the valid messaging protocol between Server/Client.
            pattern (int): sets the supported messaging pattern(flow of communication) between Server/Client
            receive_mode (bool): select the NetGear_Async's Mode of operation.
            timeout (int/float): controls the maximum waiting time(in sec) after which Client throws `TimeoutError`.
            enablePiCamera (bool): provide access to PiGear(if True) or CamGear(if False) APIs respectively.
            stabilize (bool): enable access to Stabilizer Class for stabilizing frames.
            camera_num (int): selects the camera module index which will be used as Rpi source.
            resolution (tuple): sets the resolution (i.e. `(width,height)`) of the Rpi source.
            framerate (int/float): sets the framerate of the Rpi source.
            source (based on input): defines the source for the input stream.
            stream_mode (bool): controls the exclusive YouTube Mode.
            backend (int): selects the backend for OpenCV's VideoCapture class.
            colorspace (str): selects the colorspace of the input stream.
            logging (bool): enables/disables logging.
            time_delay (int): time delay (in sec) before start reading the frames.
            options (dict): provides ability to alter Tweak Parameters of NetGear_Async, CamGear, PiGear & Stabilizer.
        """
        # enable logging if specified
        self.__logging = logging if isinstance(logging, bool) else False

        # print current version
        logcurr_vidgear_ver(logging=self.__logging)

        # raise error(s) for critical Class imports
        import_dependency_safe(
            "zmq" if zmq is None else "", min_version="4.0", pkg_name="pyzmq"
        )
        import_dependency_safe("msgpack" if msgpack is None else "")
        import_dependency_safe("msgpack_numpy" if m is None else "")

        # define valid messaging patterns => `0`: PAIR, `1`:(REQ, REP), `2`:(SUB, PUB), `3`:(PUSH, PULL)
        valid_messaging_patterns = {
            0: (zmq.PAIR, zmq.PAIR),
            1: (zmq.REQ, zmq.REP),
            2: (zmq.PUB, zmq.SUB),
            3: (zmq.PUSH, zmq.PULL),
        }

        # check whether user-defined messaging pattern is valid
        if isinstance(pattern, int) and pattern in valid_messaging_patterns:
            # assign value
            self.__msg_pattern = pattern
            self.__pattern = valid_messaging_patterns[pattern]
        else:
            # otherwise default to 0:`zmq.PAIR`
            self.__msg_pattern = 0
            self.__pattern = valid_messaging_patterns[self.__msg_pattern]
            self.__logging and logger.warning(
                "Invalid pattern {pattern}. Defaulting to `zmq.PAIR`!".format(
                    pattern=pattern
                )
            )

        # check  whether user-defined messaging protocol is valid
        if isinstance(protocol, str) and protocol in ["tcp", "ipc"]:
            # assign value
            self.__protocol = protocol
        else:
            # else default to `tcp` protocol
            self.__protocol = "tcp"
            self.__logging and logger.warning("Invalid protocol. Defaulting to `tcp`!")

        # initialize Termination flag
        self.__terminate = False
        # initialize and assign `Receive Mode`
        self.__receive_mode = receive_mode
        # initialize stream handler
        self.__stream = None
        # initialize Messaging Socket
        self.__msg_socket = None
        # initialize NetGear_Async's configuration dictionary
        self.config = {}
        # asyncio queue handler
        self.__queue = None
        # define Bidirectional mode
        self.__bi_mode = False  # handles Bidirectional mode state

        # assign timeout for Receiver end
        if timeout and isinstance(timeout, (int, float)):
            self.__timeout = float(timeout)
        else:
            self.__timeout = 15.0

        # generate 8-digit random system id
        self.__id = "".join(
            secrets.choice(string.ascii_uppercase + string.digits) for i in range(8)
        )

        # Handle user-defined options dictionary values
        # reformat dictionary
        options = {str(k).strip(): v for k, v in options.items()}
        # handle bidirectional mode
        if "bidirectional_mode" in options:
            value = options["bidirectional_mode"]
            # also check if pattern and source is valid
            if isinstance(value, bool) and pattern < 2 and source is None:
                # activate Bidirectional mode if specified
                self.__bi_mode = value
            else:
                # otherwise disable it
                self.__bi_mode = False
                logger.warning("Bidirectional data transmission is disabled!")
            # handle errors and logging
            if pattern >= 2:
                # raise error
                raise ValueError(
                    "[NetGear_Async:ERROR] :: `{}` pattern is not valid when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(
                        pattern
                    )
                )
            elif not (source is None):
                raise ValueError(
                    "[NetGear_Async:ERROR] :: Custom source must be used when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(
                        pattern
                    )
                )
            elif isinstance(value, bool) and self.__logging:
                # log Bidirectional mode activation
                logger.debug(
                    "Bidirectional Data Transmission is {} for this connection!".format(
                        "enabled" if value else "disabled"
                    )
                )
            else:
                logger.error("`bidirectional_mode` value is invalid!")
            # clean
            del options["bidirectional_mode"]

        # Setup and assign event loop policy
        if platform.system() == "Windows":
            # On Windows, VidGear requires the ``WindowsSelectorEventLoop``, but Python 3.8 and above,
            # defaults to an ``ProactorEventLoop`` loop that is not compatible with it. Thereby,
            # we had to set it manually.
            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
        else:
            if not (uvloop is None):
                # Latest uvloop eventloop is only available for UNIX machines.
                asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
            else:
                # log if not present
                import_dependency_safe("uvloop", error="log")

        # Retrieve event loop and assign it
        try:
            self.loop = asyncio.get_running_loop()
        except RuntimeError:
            # otherwise create one
            logger.critical("No running event loop found. Creating a new one.")
            self.loop = asyncio.new_event_loop()

        # log eventloop for debugging
        self.__logging and logger.info(
            "Using ``{}`` event loop for this process.".format(
                self.loop.__class__.__name__
            )
        )

        # define messaging asynchronous Context
        self.__msg_context = zmq.asyncio.Context()

        # check whether `Receive Mode` is enabled
        if receive_mode:
            # assign local IP address if None
            if address is None:
                self.__address = "*"  # define address
            else:
                self.__address = address
            # assign default port address if None
            if port is None:
                self.__port = "5555"
            else:
                self.__port = port
        else:
            # Handle video source
            if source is None:
                self.config = {"generator": None}
                self.__logging and logger.warning("Given source is of NoneType!")
            else:
                # define stream with necessary params
                self.__stream = VideoGear(
                    enablePiCamera=enablePiCamera,
                    stabilize=stabilize,
                    source=source,
                    camera_num=camera_num,
                    stream_mode=stream_mode,
                    backend=backend,
                    colorspace=colorspace,
                    resolution=resolution,
                    framerate=framerate,
                    logging=logging,
                    time_delay=time_delay,
                    **options
                )
                # define default frame generator in configuration
                self.config = {"generator": self.__frame_generator()}
            # assign local ip address if None
            if address is None:
                self.__address = "localhost"
            else:
                self.__address = address
            # assign default port address if None
            if port is None:
                self.__port = "5555"
            else:
                self.__port = port
            # add server task handler
            self.task = None

        # create asyncio queue if bidirectional mode activated
        self.__queue = asyncio.Queue() if self.__bi_mode else None

    def launch(self) -> T:
        """
        Launches an asynchronous generators and loop executors for respective task.
        """
        # check if receive mode enabled
        if self.__receive_mode:
            self.__logging and logger.debug(
                "Launching NetGear_Async asynchronous generator!"
            )
            # run loop executor for Receiver asynchronous generator
            self.loop.run_in_executor(None, self.recv_generator)
        else:
            # Otherwise launch Server handler
            self.__logging and logger.debug(
                "Creating NetGear_Async asynchronous server handler!"
            )
            # create task for Server Handler
            self.task = self.loop.create_task(self.__server_handler())
        # return instance
        return self

    async def __server_handler(self):
        """
        Handles various Server-end processes/tasks.
        """
        # validate assigned frame generator in NetGear_Async configuration
        if isinstance(self.config, dict) and "generator" in self.config:
            # check if its  assigned value is a asynchronous generator
            if self.config["generator"] is None or not inspect.isasyncgen(
                self.config["generator"]
            ):
                # otherwise raise error
                raise ValueError(
                    "[NetGear_Async:ERROR] :: Invalid configuration. Assigned generator must be a asynchronous generator function/method only!"
                )
        else:
            # raise error if validation fails
            raise RuntimeError(
                "[NetGear_Async:ERROR] :: Assigned NetGear_Async configuration is invalid!"
            )

        # define our messaging socket
        self.__msg_socket = self.__msg_context.socket(self.__pattern[0])

        # if req/rep pattern, define additional flags
        if self.__msg_pattern == 1:
            self.__msg_socket.REQ_RELAXED = True
            self.__msg_socket.REQ_CORRELATE = True

        # if pub/sub pattern, define additional optimizer
        if self.__msg_pattern == 2:
            self.__msg_socket.set_hwm(1)

        # try connecting socket to assigned protocol, address and port
        try:
            self.__msg_socket.connect(
                self.__protocol + "://" + str(self.__address) + ":" + str(self.__port)
            )
            # finally log if successful
            self.__logging and logger.debug(
                "Successfully connected to address: {} with pattern: {}.".format(
                    (
                        self.__protocol
                        + "://"
                        + str(self.__address)
                        + ":"
                        + str(self.__port)
                    ),
                    self.__msg_pattern,
                )
            )
            logger.critical(
                "Send Mode is successfully activated and ready to send data!"
            )
        except Exception as e:
            # log ad raise error if failed
            logger.exception(str(e))
            if self.__bi_mode:
                logger.error(
                    "Failed to activate Bidirectional Mode for this connection!"
                )
            raise ValueError(
                "[NetGear_Async:ERROR] :: Failed to connect address: {} and pattern: {}!".format(
                    (
                        self.__protocol
                        + "://"
                        + str(self.__address)
                        + ":"
                        + str(self.__port)
                    ),
                    self.__msg_pattern,
                )
            )

        # loop over our Asynchronous frame generator
        async for dataframe in self.config["generator"]:
            # extract data if bidirectional mode
            if self.__bi_mode and len(dataframe) == 2:
                (data, frame) = dataframe
                if not (data is None) and isinstance(data, np.ndarray):
                    logger.warning(
                        "Skipped unsupported `data` of datatype: {}!".format(
                            type(data).__name__
                        )
                    )
                    data = None
                assert isinstance(
                    frame, np.ndarray
                ), "[NetGear_Async:ERROR] :: Invalid data received from server end!"
            elif self.__bi_mode:
                # raise error for invalid data
                raise ValueError(
                    "[NetGear_Async:ERROR] :: Send Mode only accepts tuple(data, frame) as input in Bidirectional Mode. \
                    Kindly refer vidgear docs!"
                )
            else:
                # otherwise just make a copy of frame
                frame = np.copy(dataframe)
                data = None

            # check if retrieved frame is `CONTIGUOUS`
            if not (frame.flags["C_CONTIGUOUS"]):
                # otherwise make it
                frame = np.ascontiguousarray(frame, dtype=frame.dtype)

            # create data dict
            data_dict = dict(
                terminate=False,
                bi_mode=self.__bi_mode,
                data=data if not (data is None) else "",
            )
            # encode it
            data_enc = msgpack.packb(data_dict)
            # send the encoded data with correct flags
            await self.__msg_socket.send(data_enc, flags=zmq.SNDMORE)

            # encode frame
            frame_enc = msgpack.packb(frame, default=m.encode)
            # send the encoded frame
            await self.__msg_socket.send_multipart([frame_enc])

            # check if bidirectional patterns used
            if self.__msg_pattern < 2:
                # handle bidirectional data transfer if enabled
                if self.__bi_mode:
                    # get receiver encoded message withing timeout limit
                    recvdmsg_encoded = await asyncio.wait_for(
                        self.__msg_socket.recv(), timeout=self.__timeout
                    )
                    # retrieve receiver data from encoded message
                    recvd_data = msgpack.unpackb(recvdmsg_encoded, use_list=False)
                    # check message type
                    if recvd_data["return_type"] == "ndarray":  # numpy.ndarray
                        # get encoded frame from receiver
                        recvdframe_encoded = await asyncio.wait_for(
                            self.__msg_socket.recv_multipart(), timeout=self.__timeout
                        )
                        # retrieve frame and put in queue
                        await self.__queue.put(
                            msgpack.unpackb(
                                recvdframe_encoded[0],
                                use_list=False,
                                object_hook=m.decode,
                            )
                        )
                    else:
                        # otherwise put data directly in queue
                        await self.__queue.put(
                            recvd_data["return_data"]
                            if recvd_data["return_data"]
                            else None
                        )
                else:
                    # otherwise log received confirmation
                    recv_confirmation = await asyncio.wait_for(
                        self.__msg_socket.recv(), timeout=self.__timeout
                    )
                    self.__logging and logger.debug(recv_confirmation)

    async def recv_generator(self) -> AsyncGenerator[Tuple[Any, NDArray], NDArray]:
        """
        A default Asynchronous Frame Generator for NetGear_Async's Receiver-end.
        """
        # check whether `receive mode` is activated
        if not (self.__receive_mode):
            # raise Value error and exit
            self.__terminate = True
            raise ValueError(
                "[NetGear_Async:ERROR] :: `recv_generator()` function cannot be accessed while `receive_mode` is disabled. Kindly refer vidgear docs!"
            )

        # initialize and define messaging socket
        self.__msg_socket = self.__msg_context.socket(self.__pattern[1])

        # define exclusive socket options for patterns
        if self.__msg_pattern == 2:
            self.__msg_socket.set_hwm(1)
            self.__msg_socket.setsockopt(zmq.SUBSCRIBE, b"")

        try:
            # bind socket to the assigned protocol, address and port
            self.__msg_socket.bind(
                self.__protocol + "://" + str(self.__address) + ":" + str(self.__port)
            )
            # finally log progress
            self.__logging and logger.debug(
                "Successfully binded to address: {} with pattern: {}.".format(
                    (
                        self.__protocol
                        + "://"
                        + str(self.__address)
                        + ":"
                        + str(self.__port)
                    ),
                    self.__msg_pattern,
                )
            )
            logger.critical("Receive Mode is activated successfully!")
        except Exception as e:
            logger.exception(str(e))
            raise RuntimeError(
                "[NetGear_Async:ERROR] :: Failed to bind address: {} and pattern: {}{}!".format(
                    (
                        self.__protocol
                        + "://"
                        + str(self.__address)
                        + ":"
                        + str(self.__port)
                    ),
                    self.__msg_pattern,
                    " and Bidirectional Mode enabled" if self.__bi_mode else "",
                )
            )

        # loop until terminated
        while not self.__terminate:
            # get encoded data message from server withing timeout limit
            datamsg_encoded = await asyncio.wait_for(
                self.__msg_socket.recv(), timeout=self.__timeout
            )
            # retrieve data from message
            data = msgpack.unpackb(datamsg_encoded, use_list=False)
            # terminate if exit` flag received from server
            if data["terminate"]:
                # send confirmation message to server if bidirectional patterns
                if self.__msg_pattern < 2:
                    # create termination confirmation message
                    return_dict = dict(
                        terminated="Client-`{}` successfully terminated!".format(
                            self.__id
                        ),
                    )
                    # encode message
                    retdata_enc = msgpack.packb(return_dict)
                    # send message back to server
                    await self.__msg_socket.send(retdata_enc)
                self.__logging and logger.info(
                    "Termination signal received from server!"
                )
                # break loop and terminate
                self.__terminate = True
                break
            # get encoded frame message from server withing timeout limit
            framemsg_encoded = await asyncio.wait_for(
                self.__msg_socket.recv_multipart(), timeout=self.__timeout
            )
            # retrieve frame from message
            frame = msgpack.unpackb(
                framemsg_encoded[0], use_list=False, object_hook=m.decode
            )

            # check if bidirectional patterns
            if self.__msg_pattern < 2:
                # handle bidirectional data transfer if enabled
                if self.__bi_mode and data["bi_mode"]:
                    # handle empty queue
                    if not self.__queue.empty():
                        return_data = await self.__queue.get()
                        self.__queue.task_done()
                    else:
                        return_data = None
                    # check if we are returning `ndarray` frames
                    if not (return_data is None) and isinstance(
                        return_data, np.ndarray
                    ):
                        # check whether the incoming frame is contiguous
                        if not (return_data.flags["C_CONTIGUOUS"]):
                            return_data = np.ascontiguousarray(
                                return_data, dtype=return_data.dtype
                            )

                        # create return type dict without data
                        rettype_dict = dict(
                            return_type=(type(return_data).__name__), return_data=None,
                        )
                        # encode it
                        rettype_enc = msgpack.packb(rettype_dict)
                        # send it to server with correct flags
                        await self.__msg_socket.send(rettype_enc, flags=zmq.SNDMORE)

                        # encode return ndarray data
                        retframe_enc = msgpack.packb(return_data, default=m.encode)
                        # send it over network to server
                        await self.__msg_socket.send_multipart([retframe_enc])
                    else:
                        # otherwise create type and data dict
                        return_dict = dict(
                            return_type=(type(return_data).__name__),
                            return_data=(
                                return_data if not (return_data is None) else ""
                            ),
                        )
                        # encode it
                        retdata_enc = msgpack.packb(return_dict)
                        # send it over network to server
                        await self.__msg_socket.send(retdata_enc)
                elif self.__bi_mode or data["bi_mode"]:
                    # raise error if bidirectional mode is disabled at server or client but not both
                    raise RuntimeError(
                        "[NetGear_Async:ERROR] :: Invalid configuration! Bidirectional Mode is not activate on {} end.".format(
                            "client" if self.__bi_mode else "server"
                        )
                    )
                else:
                    # otherwise just send confirmation message to server
                    await self.__msg_socket.send(
                        bytes(
                            "Data received on client: {} !".format(self.__id), "utf-8"
                        )
                    )
            # yield received tuple(data-frame) if bidirectional mode or else just frame
            if self.__bi_mode:
                yield (data["data"], frame) if data["data"] else (None, frame)
            else:
                yield frame
            # sleep for sometime
            await asyncio.sleep(0)

    async def __frame_generator(self):
        """
        Returns a default frame-generator for NetGear_Async's Server Handler.
        """
        # start stream
        self.__stream.start()
        # loop over stream until its terminated
        while not self.__terminate:
            # read frames
            frame = self.__stream.read()
            # break if NoneType
            if frame is None:
                break
            # yield frame
            yield frame
            # sleep for sometime
            await asyncio.sleep(0)

    async def transceive_data(self, data: Any = None) -> Any:
        """
        Bidirectional Mode exclusive method to Transmit data _(in Receive mode)_ and Receive data _(in Send mode)_.

        Parameters:
            data (any): inputs data _(of any datatype)_ for sending back to Server.
        """
        recvd_data = None
        if not self.__terminate:
            if self.__bi_mode:
                if self.__receive_mode:
                    await self.__queue.put(data)
                else:
                    if not self.__queue.empty():
                        recvd_data = await self.__queue.get()
                        self.__queue.task_done()
            else:
                logger.error(
                    "`transceive_data()` function cannot be used when Bidirectional Mode is disabled."
                )
        return recvd_data

    async def __terminate_connection(self, disable_confirmation=False):
        """
        Internal asyncio method to safely terminate ZMQ connection and queues

        Parameters:
            disable_confirmation (boolean): Force disable termination confirmation from client in bidirectional patterns.
        """
        # log termination
        self.__logging and logger.debug(
            "Terminating various {} Processes. Please wait.".format(
                "Receive Mode" if self.__receive_mode else "Send Mode"
            )
        )

        # check whether `receive_mode` is enabled or not
        if self.__receive_mode:
            # indicate that process should be terminated
            self.__terminate = True
        else:
            # indicate that process should be terminated
            self.__terminate = True
            # terminate stream
            if not (self.__stream is None):
                self.__stream.stop()
            # signal `exit` flag for termination!
            data_dict = dict(terminate=True)
            data_enc = msgpack.packb(data_dict)
            await self.__msg_socket.send(data_enc)
            # check if bidirectional patterns
            if self.__msg_pattern < 2 and not disable_confirmation:
                # then receive and log confirmation
                recv_confirmation = await self.__msg_socket.recv()
                recvd_conf = msgpack.unpackb(recv_confirmation, use_list=False)
                self.__logging and "terminated" in recvd_conf and logger.debug(
                    recvd_conf["terminated"]
                )
        # close socket
        self.__msg_socket.setsockopt(zmq.LINGER, 0)
        self.__msg_socket.close()
        # handle asyncio queues in bidirectional mode
        if self.__bi_mode:
            # empty queue if not
            while not self.__queue.empty():
                try:
                    self.__queue.get_nowait()
                except asyncio.QueueEmpty:
                    continue
                self.__queue.task_done()
            # join queues
            await self.__queue.join()

        logger.critical(
            "{} successfully terminated!".format(
                "Receive Mode" if self.__receive_mode else "Send Mode"
            )
        )

    def close(self, skip_loop: bool = False) -> None:
        """
        Terminates all NetGear_Async Asynchronous processes gracefully.

        Parameters:
            skip_loop (Boolean): (optional)used only if don't want to close eventloop(required in pytest).
        """
        # close event loop if specified
        if not (skip_loop):
            # close connection gracefully
            self.loop.run_until_complete(self.__terminate_connection())
            self.loop.close()
        else:
            # otherwise create a task
            asyncio.ensure_future(
                self.__terminate_connection(disable_confirmation=True)
            )

__init__(address=None, port=None, protocol='tcp', pattern=0, receive_mode=False, timeout=0.0, enablePiCamera=False, stabilize=False, source=None, camera_num=0, stream_mode=False, backend=0, colorspace=None, resolution=(640, 480), framerate=25, time_delay=0, logging=False, **options) ¶

This constructor method initializes the object state and attributes of the NetGear_Async class.

Parameters:

Name Type Description Default
address str

sets the valid network address of the Server/Client.

None
port str

sets the valid Network Port of the Server/Client.

None
protocol str

sets the valid messaging protocol between Server/Client.

'tcp'
pattern int

sets the supported messaging pattern(flow of communication) between Server/Client

0
receive_mode bool

select the NetGear_Async's Mode of operation.

False
timeout int / float

controls the maximum waiting time(in sec) after which Client throws TimeoutError.

0.0
enablePiCamera bool

provide access to PiGear(if True) or CamGear(if False) APIs respectively.

False
stabilize bool

enable access to Stabilizer Class for stabilizing frames.

False
camera_num int

selects the camera module index which will be used as Rpi source.

0
resolution tuple

sets the resolution (i.e. (width,height)) of the Rpi source.

(640, 480)
framerate int / float

sets the framerate of the Rpi source.

25
source based on input

defines the source for the input stream.

None
stream_mode bool

controls the exclusive YouTube Mode.

False
backend int

selects the backend for OpenCV's VideoCapture class.

0
colorspace str

selects the colorspace of the input stream.

None
logging bool

enables/disables logging.

False
time_delay int

time delay (in sec) before start reading the frames.

0
options dict

provides ability to alter Tweak Parameters of NetGear_Async, CamGear, PiGear & Stabilizer.

{}
Source code in vidgear/gears/asyncio/netgear_async.py
def __init__(
    self,
    # NetGear_Async parameters
    address: str = None,
    port: str = None,
    protocol: str = "tcp",
    pattern: int = 0,
    receive_mode: bool = False,
    timeout: Union[int, float] = 0.0,
    # Videogear parameters
    enablePiCamera: bool = False,
    stabilize: bool = False,
    source: Any = None,
    camera_num: int = 0,
    stream_mode: bool = False,
    backend: int = 0,
    colorspace: str = None,
    resolution: Tuple[int, int] = (640, 480),
    framerate: Union[int, float] = 25,
    time_delay: int = 0,
    # common parameters
    logging: bool = False,
    **options: dict
):
    """
    This constructor method initializes the object state and attributes of the NetGear_Async class.

    Parameters:
        address (str): sets the valid network address of the Server/Client.
        port (str): sets the valid Network Port of the Server/Client.
        protocol (str): sets the valid messaging protocol between Server/Client.
        pattern (int): sets the supported messaging pattern(flow of communication) between Server/Client
        receive_mode (bool): select the NetGear_Async's Mode of operation.
        timeout (int/float): controls the maximum waiting time(in sec) after which Client throws `TimeoutError`.
        enablePiCamera (bool): provide access to PiGear(if True) or CamGear(if False) APIs respectively.
        stabilize (bool): enable access to Stabilizer Class for stabilizing frames.
        camera_num (int): selects the camera module index which will be used as Rpi source.
        resolution (tuple): sets the resolution (i.e. `(width,height)`) of the Rpi source.
        framerate (int/float): sets the framerate of the Rpi source.
        source (based on input): defines the source for the input stream.
        stream_mode (bool): controls the exclusive YouTube Mode.
        backend (int): selects the backend for OpenCV's VideoCapture class.
        colorspace (str): selects the colorspace of the input stream.
        logging (bool): enables/disables logging.
        time_delay (int): time delay (in sec) before start reading the frames.
        options (dict): provides ability to alter Tweak Parameters of NetGear_Async, CamGear, PiGear & Stabilizer.
    """
    # enable logging if specified
    self.__logging = logging if isinstance(logging, bool) else False

    # print current version
    logcurr_vidgear_ver(logging=self.__logging)

    # raise error(s) for critical Class imports
    import_dependency_safe(
        "zmq" if zmq is None else "", min_version="4.0", pkg_name="pyzmq"
    )
    import_dependency_safe("msgpack" if msgpack is None else "")
    import_dependency_safe("msgpack_numpy" if m is None else "")

    # define valid messaging patterns => `0`: PAIR, `1`:(REQ, REP), `2`:(SUB, PUB), `3`:(PUSH, PULL)
    valid_messaging_patterns = {
        0: (zmq.PAIR, zmq.PAIR),
        1: (zmq.REQ, zmq.REP),
        2: (zmq.PUB, zmq.SUB),
        3: (zmq.PUSH, zmq.PULL),
    }

    # check whether user-defined messaging pattern is valid
    if isinstance(pattern, int) and pattern in valid_messaging_patterns:
        # assign value
        self.__msg_pattern = pattern
        self.__pattern = valid_messaging_patterns[pattern]
    else:
        # otherwise default to 0:`zmq.PAIR`
        self.__msg_pattern = 0
        self.__pattern = valid_messaging_patterns[self.__msg_pattern]
        self.__logging and logger.warning(
            "Invalid pattern {pattern}. Defaulting to `zmq.PAIR`!".format(
                pattern=pattern
            )
        )

    # check  whether user-defined messaging protocol is valid
    if isinstance(protocol, str) and protocol in ["tcp", "ipc"]:
        # assign value
        self.__protocol = protocol
    else:
        # else default to `tcp` protocol
        self.__protocol = "tcp"
        self.__logging and logger.warning("Invalid protocol. Defaulting to `tcp`!")

    # initialize Termination flag
    self.__terminate = False
    # initialize and assign `Receive Mode`
    self.__receive_mode = receive_mode
    # initialize stream handler
    self.__stream = None
    # initialize Messaging Socket
    self.__msg_socket = None
    # initialize NetGear_Async's configuration dictionary
    self.config = {}
    # asyncio queue handler
    self.__queue = None
    # define Bidirectional mode
    self.__bi_mode = False  # handles Bidirectional mode state

    # assign timeout for Receiver end
    if timeout and isinstance(timeout, (int, float)):
        self.__timeout = float(timeout)
    else:
        self.__timeout = 15.0

    # generate 8-digit random system id
    self.__id = "".join(
        secrets.choice(string.ascii_uppercase + string.digits) for i in range(8)
    )

    # Handle user-defined options dictionary values
    # reformat dictionary
    options = {str(k).strip(): v for k, v in options.items()}
    # handle bidirectional mode
    if "bidirectional_mode" in options:
        value = options["bidirectional_mode"]
        # also check if pattern and source is valid
        if isinstance(value, bool) and pattern < 2 and source is None:
            # activate Bidirectional mode if specified
            self.__bi_mode = value
        else:
            # otherwise disable it
            self.__bi_mode = False
            logger.warning("Bidirectional data transmission is disabled!")
        # handle errors and logging
        if pattern >= 2:
            # raise error
            raise ValueError(
                "[NetGear_Async:ERROR] :: `{}` pattern is not valid when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(
                    pattern
                )
            )
        elif not (source is None):
            raise ValueError(
                "[NetGear_Async:ERROR] :: Custom source must be used when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(
                    pattern
                )
            )
        elif isinstance(value, bool) and self.__logging:
            # log Bidirectional mode activation
            logger.debug(
                "Bidirectional Data Transmission is {} for this connection!".format(
                    "enabled" if value else "disabled"
                )
            )
        else:
            logger.error("`bidirectional_mode` value is invalid!")
        # clean
        del options["bidirectional_mode"]

    # Setup and assign event loop policy
    if platform.system() == "Windows":
        # On Windows, VidGear requires the ``WindowsSelectorEventLoop``, but Python 3.8 and above,
        # defaults to an ``ProactorEventLoop`` loop that is not compatible with it. Thereby,
        # we had to set it manually.
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    else:
        if not (uvloop is None):
            # Latest uvloop eventloop is only available for UNIX machines.
            asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
        else:
            # log if not present
            import_dependency_safe("uvloop", error="log")

    # Retrieve event loop and assign it
    try:
        self.loop = asyncio.get_running_loop()
    except RuntimeError:
        # otherwise create one
        logger.critical("No running event loop found. Creating a new one.")
        self.loop = asyncio.new_event_loop()

    # log eventloop for debugging
    self.__logging and logger.info(
        "Using ``{}`` event loop for this process.".format(
            self.loop.__class__.__name__
        )
    )

    # define messaging asynchronous Context
    self.__msg_context = zmq.asyncio.Context()

    # check whether `Receive Mode` is enabled
    if receive_mode:
        # assign local IP address if None
        if address is None:
            self.__address = "*"  # define address
        else:
            self.__address = address
        # assign default port address if None
        if port is None:
            self.__port = "5555"
        else:
            self.__port = port
    else:
        # Handle video source
        if source is None:
            self.config = {"generator": None}
            self.__logging and logger.warning("Given source is of NoneType!")
        else:
            # define stream with necessary params
            self.__stream = VideoGear(
                enablePiCamera=enablePiCamera,
                stabilize=stabilize,
                source=source,
                camera_num=camera_num,
                stream_mode=stream_mode,
                backend=backend,
                colorspace=colorspace,
                resolution=resolution,
                framerate=framerate,
                logging=logging,
                time_delay=time_delay,
                **options
            )
            # define default frame generator in configuration
            self.config = {"generator": self.__frame_generator()}
        # assign local ip address if None
        if address is None:
            self.__address = "localhost"
        else:
            self.__address = address
        # assign default port address if None
        if port is None:
            self.__port = "5555"
        else:
            self.__port = port
        # add server task handler
        self.task = None

    # create asyncio queue if bidirectional mode activated
    self.__queue = asyncio.Queue() if self.__bi_mode else None

close(skip_loop=False) ¶

Terminates all NetGear_Async Asynchronous processes gracefully.

Parameters:

Name Type Description Default
skip_loop Boolean

(optional)used only if don't want to close eventloop(required in pytest).

False
Source code in vidgear/gears/asyncio/netgear_async.py
def close(self, skip_loop: bool = False) -> None:
    """
    Terminates all NetGear_Async Asynchronous processes gracefully.

    Parameters:
        skip_loop (Boolean): (optional)used only if don't want to close eventloop(required in pytest).
    """
    # close event loop if specified
    if not (skip_loop):
        # close connection gracefully
        self.loop.run_until_complete(self.__terminate_connection())
        self.loop.close()
    else:
        # otherwise create a task
        asyncio.ensure_future(
            self.__terminate_connection(disable_confirmation=True)
        )

launch() ¶

Launches an asynchronous generators and loop executors for respective task.

Source code in vidgear/gears/asyncio/netgear_async.py
def launch(self) -> T:
    """
    Launches an asynchronous generators and loop executors for respective task.
    """
    # check if receive mode enabled
    if self.__receive_mode:
        self.__logging and logger.debug(
            "Launching NetGear_Async asynchronous generator!"
        )
        # run loop executor for Receiver asynchronous generator
        self.loop.run_in_executor(None, self.recv_generator)
    else:
        # Otherwise launch Server handler
        self.__logging and logger.debug(
            "Creating NetGear_Async asynchronous server handler!"
        )
        # create task for Server Handler
        self.task = self.loop.create_task(self.__server_handler())
    # return instance
    return self

recv_generator() async ¶

A default Asynchronous Frame Generator for NetGear_Async's Receiver-end.

Source code in vidgear/gears/asyncio/netgear_async.py
async def recv_generator(self) -> AsyncGenerator[Tuple[Any, NDArray], NDArray]:
    """
    A default Asynchronous Frame Generator for NetGear_Async's Receiver-end.
    """
    # check whether `receive mode` is activated
    if not (self.__receive_mode):
        # raise Value error and exit
        self.__terminate = True
        raise ValueError(
            "[NetGear_Async:ERROR] :: `recv_generator()` function cannot be accessed while `receive_mode` is disabled. Kindly refer vidgear docs!"
        )

    # initialize and define messaging socket
    self.__msg_socket = self.__msg_context.socket(self.__pattern[1])

    # define exclusive socket options for patterns
    if self.__msg_pattern == 2:
        self.__msg_socket.set_hwm(1)
        self.__msg_socket.setsockopt(zmq.SUBSCRIBE, b"")

    try:
        # bind socket to the assigned protocol, address and port
        self.__msg_socket.bind(
            self.__protocol + "://" + str(self.__address) + ":" + str(self.__port)
        )
        # finally log progress
        self.__logging and logger.debug(
            "Successfully binded to address: {} with pattern: {}.".format(
                (
                    self.__protocol
                    + "://"
                    + str(self.__address)
                    + ":"
                    + str(self.__port)
                ),
                self.__msg_pattern,
            )
        )
        logger.critical("Receive Mode is activated successfully!")
    except Exception as e:
        logger.exception(str(e))
        raise RuntimeError(
            "[NetGear_Async:ERROR] :: Failed to bind address: {} and pattern: {}{}!".format(
                (
                    self.__protocol
                    + "://"
                    + str(self.__address)
                    + ":"
                    + str(self.__port)
                ),
                self.__msg_pattern,
                " and Bidirectional Mode enabled" if self.__bi_mode else "",
            )
        )

    # loop until terminated
    while not self.__terminate:
        # get encoded data message from server withing timeout limit
        datamsg_encoded = await asyncio.wait_for(
            self.__msg_socket.recv(), timeout=self.__timeout
        )
        # retrieve data from message
        data = msgpack.unpackb(datamsg_encoded, use_list=False)
        # terminate if exit` flag received from server
        if data["terminate"]:
            # send confirmation message to server if bidirectional patterns
            if self.__msg_pattern < 2:
                # create termination confirmation message
                return_dict = dict(
                    terminated="Client-`{}` successfully terminated!".format(
                        self.__id
                    ),
                )
                # encode message
                retdata_enc = msgpack.packb(return_dict)
                # send message back to server
                await self.__msg_socket.send(retdata_enc)
            self.__logging and logger.info(
                "Termination signal received from server!"
            )
            # break loop and terminate
            self.__terminate = True
            break
        # get encoded frame message from server withing timeout limit
        framemsg_encoded = await asyncio.wait_for(
            self.__msg_socket.recv_multipart(), timeout=self.__timeout
        )
        # retrieve frame from message
        frame = msgpack.unpackb(
            framemsg_encoded[0], use_list=False, object_hook=m.decode
        )

        # check if bidirectional patterns
        if self.__msg_pattern < 2:
            # handle bidirectional data transfer if enabled
            if self.__bi_mode and data["bi_mode"]:
                # handle empty queue
                if not self.__queue.empty():
                    return_data = await self.__queue.get()
                    self.__queue.task_done()
                else:
                    return_data = None
                # check if we are returning `ndarray` frames
                if not (return_data is None) and isinstance(
                    return_data, np.ndarray
                ):
                    # check whether the incoming frame is contiguous
                    if not (return_data.flags["C_CONTIGUOUS"]):
                        return_data = np.ascontiguousarray(
                            return_data, dtype=return_data.dtype
                        )

                    # create return type dict without data
                    rettype_dict = dict(
                        return_type=(type(return_data).__name__), return_data=None,
                    )
                    # encode it
                    rettype_enc = msgpack.packb(rettype_dict)
                    # send it to server with correct flags
                    await self.__msg_socket.send(rettype_enc, flags=zmq.SNDMORE)

                    # encode return ndarray data
                    retframe_enc = msgpack.packb(return_data, default=m.encode)
                    # send it over network to server
                    await self.__msg_socket.send_multipart([retframe_enc])
                else:
                    # otherwise create type and data dict
                    return_dict = dict(
                        return_type=(type(return_data).__name__),
                        return_data=(
                            return_data if not (return_data is None) else ""
                        ),
                    )
                    # encode it
                    retdata_enc = msgpack.packb(return_dict)
                    # send it over network to server
                    await self.__msg_socket.send(retdata_enc)
            elif self.__bi_mode or data["bi_mode"]:
                # raise error if bidirectional mode is disabled at server or client but not both
                raise RuntimeError(
                    "[NetGear_Async:ERROR] :: Invalid configuration! Bidirectional Mode is not activate on {} end.".format(
                        "client" if self.__bi_mode else "server"
                    )
                )
            else:
                # otherwise just send confirmation message to server
                await self.__msg_socket.send(
                    bytes(
                        "Data received on client: {} !".format(self.__id), "utf-8"
                    )
                )
        # yield received tuple(data-frame) if bidirectional mode or else just frame
        if self.__bi_mode:
            yield (data["data"], frame) if data["data"] else (None, frame)
        else:
            yield frame
        # sleep for sometime
        await asyncio.sleep(0)

transceive_data(data=None) async ¶

Bidirectional Mode exclusive method to Transmit data (in Receive mode) and Receive data (in Send mode).

Parameters:

Name Type Description Default
data any

inputs data (of any datatype) for sending back to Server.

None
Source code in vidgear/gears/asyncio/netgear_async.py
async def transceive_data(self, data: Any = None) -> Any:
    """
    Bidirectional Mode exclusive method to Transmit data _(in Receive mode)_ and Receive data _(in Send mode)_.

    Parameters:
        data (any): inputs data _(of any datatype)_ for sending back to Server.
    """
    recvd_data = None
    if not self.__terminate:
        if self.__bi_mode:
            if self.__receive_mode:
                await self.__queue.put(data)
            else:
                if not self.__queue.empty():
                    recvd_data = await self.__queue.get()
                    self.__queue.task_done()
        else:
            logger.error(
                "`transceive_data()` function cannot be used when Bidirectional Mode is disabled."
            )
    return recvd_data

 

Was this page helpful?