Skip to content

NetGear API References

NetGear API usage examples can be found here ➶

NetGear API parameters are explained here ➶

NetGear is exclusively designed to transfer video frames synchronously and asynchronously between interconnecting systems over the network in real-time.

NetGear implements a high-level wrapper around PyZmQ python library that contains python bindings for ZeroMQ - a high-performance asynchronous distributed messaging library that provides a message queue, but unlike message-oriented middleware, its system can run without a dedicated message broker.

NetGear also supports real-time Frame Compression capabilities for optimizing performance while sending the frames directly over the network, by encoding the frame before sending it and decoding it on the client's end automatically in real-time.

Info

NetGear API now internally implements robust Lazy Pirate pattern (auto-reconnection) for its synchronous messaging patterns (i.e. zmq.PAIR & zmq.REQ/zmq.REP) at both Server and Client ends, where its API instead of doing a blocking receive, will:

  • Poll the socket and receive from it only when it's sure a reply has arrived.
  • Attempt to reconnect, if no reply has arrived within a timeout period.
  • Abandon the connection if there is still no reply after several requests.

NetGear as of now seamlessly supports three ZeroMQ messaging patterns:

  • zmq.PAIR (ZMQ Pair Pattern)
  • zmq.REQ/zmq.REP (ZMQ Request/Reply Pattern)
  • zmq.PUB/zmq.SUB (ZMQ Publish/Subscribe Pattern)

whereas the supported protocol are: tcp and ipc.

Modes of Operation
  • Primary Modes

    NetGear API primarily has two modes of operations:

    • Send Mode: which employs send() function to send video frames over the network in real-time.

    • Receive Mode: which employs recv() function to receive frames, sent over the network with Send Mode in real-time. The mode sends back confirmation when the frame is received successfully in few patterns.

  • Exclusive Modes

    In addition to these primary modes, NetGear API offers applications-specific Exclusive Modes:

    • Multi-Servers Mode: In this exclusive mode, NetGear API robustly handles multiple servers at once, thereby providing seamless access to frames and unidirectional data transfer from multiple Servers/Publishers across the network in real-time.

    • Multi-Clients Mode: In this exclusive mode, NetGear API robustly handles multiple clients at once, thereby providing seamless access to frames and unidirectional data transfer to multiple Client/Consumers across the network in real-time.

    • Bidirectional Mode: This exclusive mode provides seamless support for bidirectional data transmission between between Server and Client along with video frames.

    • Secure Mode: In this exclusive mode, NetGear API provides easy access to powerful, smart & secure ZeroMQ's Security Layers that enables strong encryption on data, and unbreakable authentication between the Server and Client with the help of custom certificates/keys that brings cheap, standardized privacy and authentication for distributed systems over the network.

Source code in vidgear/gears/netgear.py
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
class NetGear:
    """
    NetGear is exclusively designed to transfer video frames synchronously and asynchronously between interconnecting systems over the network in real-time.

    NetGear implements a high-level wrapper around PyZmQ python library that contains python bindings for ZeroMQ - a high-performance asynchronous distributed messaging library
    that provides a message queue, but unlike message-oriented middleware, its system can run without a dedicated message broker.

    NetGear also supports real-time Frame Compression capabilities for optimizing performance while sending the frames directly over the network, by encoding the frame before sending
    it and decoding it on the client's end automatically in real-time.

    !!! info
        NetGear API now internally implements robust *Lazy Pirate pattern* (auto-reconnection) for its synchronous messaging patterns _(i.e. `zmq.PAIR` & `zmq.REQ/zmq.REP`)_
        at both Server and Client ends, where its API instead of doing a blocking receive, will:

        * Poll the socket and receive from it only when it's sure a reply has arrived.
        * Attempt to reconnect, if no reply has arrived within a timeout period.
        * Abandon the connection if there is still no reply after several requests.

    NetGear as of now seamlessly supports three ZeroMQ messaging patterns:

    - `zmq.PAIR` _(ZMQ Pair Pattern)_
    - `zmq.REQ/zmq.REP` _(ZMQ Request/Reply Pattern)_
    - `zmq.PUB/zmq.SUB` _(ZMQ Publish/Subscribe Pattern)_

    _whereas the supported protocol are: `tcp` and `ipc`_.

    ??? tip "Modes of Operation"

        * **Primary Modes**

            NetGear API primarily has two modes of operations:

            * **Send Mode:** _which employs `send()` function to send video frames over the network in real-time._

            * **Receive Mode:** _which employs `recv()` function to receive frames, sent over the network with *Send Mode* in real-time. The mode sends back confirmation when the
            frame is received successfully in few patterns._

        * **Exclusive Modes**

            In addition to these primary modes, NetGear API offers applications-specific Exclusive Modes:

            * **Multi-Servers Mode:** _In this exclusive mode, NetGear API robustly **handles multiple servers at once**, thereby providing seamless access to frames and unidirectional
            data transfer from multiple Servers/Publishers across the network in real-time._

            * **Multi-Clients Mode:** _In this exclusive mode, NetGear API robustly **handles multiple clients at once**, thereby providing seamless access to frames and unidirectional
            data transfer to multiple Client/Consumers across the network in real-time._

            * **Bidirectional Mode:** _This exclusive mode **provides seamless support for bidirectional data transmission between between Server and Client along with video frames**._

            * **Secure Mode:** _In this exclusive mode, NetGear API **provides easy access to powerful, smart & secure ZeroMQ's Security Layers** that enables strong encryption on
            data, and unbreakable authentication between the Server and Client with the help of custom certificates/keys that brings cheap, standardized privacy and authentication
            for distributed systems over the network._
    """

    def __init__(
        self,
        address: str = None,
        port: str = None,
        protocol: str = None,
        pattern: int = 0,
        receive_mode: bool = False,
        logging: bool = False,
        **options: dict
    ):
        """
        This constructor method initializes the object state and attributes of the NetGear class.

        Parameters:
            address (str): sets the valid network address of the Server/Client.
            port (str): sets the valid Network Port of the Server/Client.
            protocol (str): sets the valid messaging protocol between Server/Client.
            pattern (int): sets the supported messaging pattern(flow of communication) between Server/Client
            receive_mode (bool): select the Netgear's Mode of operation.
            logging (bool): enables/disables logging.
            options (dict): provides the flexibility to alter various NetGear internal properties.
        """
        # enable logging if specified
        self.__logging = logging if isinstance(logging, bool) else False

        # print current version
        logcurr_vidgear_ver(logging=self.__logging)

        # raise error(s) for critical Class imports
        import_dependency_safe(
            "zmq" if zmq is None else "", min_version="4.0", pkg_name="pyzmq"
        )
        import_dependency_safe(
            "simplejpeg" if simplejpeg is None else "", error="log", min_version="1.6.1"
        )

        # define valid messaging patterns => `0`: zmq.PAIR, `1`:(zmq.REQ,zmq.REP), and `1`:(zmq.SUB,zmq.PUB)
        valid_messaging_patterns = {
            0: (zmq.PAIR, zmq.PAIR),
            1: (zmq.REQ, zmq.REP),
            2: (zmq.PUB, zmq.SUB),
        }

        # Handle messaging pattern
        msg_pattern = None
        # check whether user-defined messaging pattern is valid
        if isinstance(pattern, int) and pattern in valid_messaging_patterns.keys():
            # assign value
            msg_pattern = valid_messaging_patterns[pattern]
        else:
            # otherwise default to 0:`zmq.PAIR`
            pattern = 0
            msg_pattern = valid_messaging_patterns[pattern]
            self.__logging and logger.warning(
                "Wrong pattern value, Defaulting to `zmq.PAIR`! Kindly refer Docs for more Information."
            )
        # assign pattern to global parameter for further use
        self.__pattern = pattern

        # Handle messaging protocol
        if protocol is None or not (protocol in ["tcp", "ipc"]):
            # else default to `tcp` protocol
            protocol = "tcp"
            # log it
            self.__logging and logger.warning(
                "Protocol is not supported or not provided. Defaulting to `tcp` protocol!"
            )

        # Handle connection params

        self.__msg_flag = 0  # handles connection flags
        self.__msg_copy = False  # handles whether to copy data
        self.__msg_track = False  # handles whether to track packets

        # Handle NetGear's internal exclusive modes and params

        # define Secure Mode
        self.__z_auth = None

        # define SSH Tunneling Mode
        self.__ssh_tunnel_mode = None  # handles ssh_tunneling mode state
        self.__ssh_tunnel_pwd = None
        self.__ssh_tunnel_keyfile = None
        self.__paramiko_present = False if paramiko is None else True

        # define Multi-Server mode
        self.__multiserver_mode = False  # handles multi-server mode state

        # define Multi-Client mode
        self.__multiclient_mode = False  # handles multi-client mode state

        # define Bidirectional mode
        self.__bi_mode = False  # handles Bidirectional mode state

        # define Secure mode
        valid_security_mech = {0: "Grasslands", 1: "StoneHouse", 2: "IronHouse"}
        self.__secure_mode = 0  # handles ZMQ security layer status
        auth_cert_dir = ""  # handles valid ZMQ certificates dir
        self.__auth_publickeys_dir = ""  # handles valid ZMQ public certificates dir
        self.__auth_secretkeys_dir = ""  # handles valid ZMQ private certificates dir
        overwrite_cert = False  # checks if certificates overwriting allowed
        custom_cert_location = ""  # handles custom ZMQ certificates path

        # define frame-compression handler
        self.__jpeg_compression = (
            True if not (simplejpeg is None) else False
        )  # enabled by default for all connections if simplejpeg is installed
        self.__jpeg_compression_quality = 90  # 90% quality
        self.__jpeg_compression_fastdct = True  # fastest DCT on by default
        self.__jpeg_compression_fastupsample = False  # fastupsample off by default
        self.__jpeg_compression_colorspace = "BGR"  # use BGR colorspace by default

        # defines frame compression on return data
        self.__ex_compression_params = None

        # define receiver return data handler
        self.__return_data = None

        # generate 8-digit random system id
        self.__id = "".join(
            secrets.choice(string.ascii_uppercase + string.digits) for i in range(8)
        )

        # define termination flag
        self.__terminate = False

        # additional settings for reliability
        if pattern < 2:
            # define zmq poller for reliable transmission
            self.__poll = zmq.Poller()
            # define max retries
            self.__max_retries = 3
            # request timeout
            self.__request_timeout = 4000  # 4 secs
        else:
            # subscriber timeout
            self.__subscriber_timeout = None

        # Handle user-defined options dictionary values
        # reformat dictionary
        options = {str(k).strip(): v for k, v in options.items()}

        # loop over dictionary key & values and assign to global variables if valid
        for key, value in options.items():
            # handle multi-server mode
            if key == "multiserver_mode" and isinstance(value, bool):
                # check if valid pattern assigned
                if pattern > 0:
                    # activate Multi-server mode
                    self.__multiserver_mode = value
                else:
                    # otherwise disable it and raise error
                    self.__multiserver_mode = False
                    logger.critical("Multi-Server Mode is disabled!")
                    raise ValueError(
                        "[NetGear:ERROR] :: `{}` pattern is not valid when Multi-Server Mode is enabled. Kindly refer Docs for more Information.".format(
                            pattern
                        )
                    )

            # handle multi-client mode
            elif key == "multiclient_mode" and isinstance(value, bool):
                # check if valid pattern assigned
                if pattern > 0:
                    # activate Multi-client mode
                    self.__multiclient_mode = value
                else:
                    # otherwise disable it and raise error
                    self.__multiclient_mode = False
                    logger.critical("Multi-Client Mode is disabled!")
                    raise ValueError(
                        "[NetGear:ERROR] :: `{}` pattern is not valid when Multi-Client Mode is enabled. Kindly refer Docs for more Information.".format(
                            pattern
                        )
                    )

            # handle bidirectional mode
            elif key == "bidirectional_mode" and isinstance(value, bool):
                # check if pattern is valid
                if pattern < 2:
                    # activate Bidirectional mode if specified
                    self.__bi_mode = value
                else:
                    # otherwise disable it and raise error
                    self.__bi_mode = False
                    logger.warning("Bidirectional data transmission is disabled!")
                    raise ValueError(
                        "[NetGear:ERROR] :: `{}` pattern is not valid when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(
                            pattern
                        )
                    )

            # handle secure mode
            elif (
                key == "secure_mode"
                and isinstance(value, int)
                and (value in valid_security_mech)
            ):
                self.__secure_mode = value

            elif key == "custom_cert_location" and isinstance(value, str):
                # verify custom auth certificates path for secure mode
                custom_cert_location = os.path.abspath(value)
                assert os.path.isdir(
                    custom_cert_location
                ), "[NetGear:ERROR] :: `custom_cert_location` value must be the path to a valid directory!"
                assert check_WriteAccess(
                    custom_cert_location,
                    is_windows=True if os.name == "nt" else False,
                    logging=self.__logging,
                ), "[NetGear:ERROR] :: Permission Denied!, cannot write ZMQ authentication certificates to '{}' directory!".format(
                    value
                )
            elif key == "overwrite_cert" and isinstance(value, bool):
                # enable/disable auth certificate overwriting in secure mode
                overwrite_cert = value

            # handle ssh-tunneling mode
            elif key == "ssh_tunnel_mode" and isinstance(value, str):
                # enable SSH Tunneling Mode
                self.__ssh_tunnel_mode = value.strip()
            elif key == "ssh_tunnel_pwd" and isinstance(value, str):
                # add valid SSH Tunneling password
                self.__ssh_tunnel_pwd = value
            elif key == "ssh_tunnel_keyfile" and isinstance(value, str):
                # add valid SSH Tunneling key-file
                self.__ssh_tunnel_keyfile = value if os.path.isfile(value) else None
                if self.__ssh_tunnel_keyfile is None:
                    logger.warning(
                        "Discarded invalid or non-existential SSH Tunnel Key-file at {}!".format(
                            value
                        )
                    )

            # handle jpeg compression
            elif (
                key == "jpeg_compression"
                and not (simplejpeg is None)
                and isinstance(value, (bool, str))
            ):
                if isinstance(value, str) and value.strip().upper() in [
                    "RGB",
                    "BGR",
                    "RGBX",
                    "BGRX",
                    "XBGR",
                    "XRGB",
                    "GRAY",
                    "RGBA",
                    "BGRA",
                    "ABGR",
                    "ARGB",
                    "CMYK",
                ]:
                    # set encoding colorspace
                    self.__jpeg_compression_colorspace = value.strip().upper()
                    # enable frame-compression encoding value
                    self.__jpeg_compression = True
                else:
                    # enable frame-compression encoding value
                    self.__jpeg_compression = value
            elif key == "jpeg_compression_quality" and isinstance(value, (int, float)):
                # set valid jpeg quality
                if value >= 10 and value <= 100:
                    self.__jpeg_compression_quality = int(value)
                else:
                    logger.warning("Skipped invalid `jpeg_compression_quality` value!")
            elif key == "jpeg_compression_fastdct" and isinstance(value, bool):
                # enable jpeg fastdct
                self.__jpeg_compression_fastdct = value
            elif key == "jpeg_compression_fastupsample" and isinstance(value, bool):
                # enable jpeg  fastupsample
                self.__jpeg_compression_fastupsample = value

            # assign maximum retries in synchronous patterns
            elif key == "max_retries" and isinstance(value, int) and pattern < 2:
                if value >= 0:
                    self.__max_retries = value
                else:
                    logger.warning("Invalid `max_retries` value skipped!")

            # assign request timeout in synchronous patterns
            elif key == "request_timeout" and isinstance(value, int) and pattern < 2:
                if value >= 4:
                    self.__request_timeout = value * 1000  # covert to milliseconds
                else:
                    logger.warning("Invalid `request_timeout` value skipped!")

            # assign subscriber timeout
            elif (
                key == "subscriber_timeout" and isinstance(value, int) and pattern == 2
            ):
                if value > 0:
                    self.__subscriber_timeout = value * 1000  # covert to milliseconds
                else:
                    logger.warning("Invalid `request_timeout` value skipped!")

            # handle ZMQ flags
            elif key == "flag" and isinstance(value, int):
                self.__msg_flag = value
                self.__msg_flag and logger.warning(
                    "The flag optional value is set to `1` (NOBLOCK) for this run. This might cause NetGear to not terminate gracefully."
                )
            elif key == "copy" and isinstance(value, bool):
                self.__msg_copy = value
            elif key == "track" and isinstance(value, bool):
                self.__msg_track = value
                self.__msg_copy and self.__msg_track and logger.info(
                    "The `track` optional value will be ignored for this run because `copy=True` is also defined."
                )
            else:
                pass

        # Handle ssh tunneling if enabled
        if not (self.__ssh_tunnel_mode is None):
            # SSH Tunnel Mode only available for server mode
            if receive_mode:
                logger.error("SSH Tunneling cannot be enabled for Client-end!")
            else:
                # check if SSH tunneling possible
                ssh_address = self.__ssh_tunnel_mode
                ssh_address, ssh_port = (
                    ssh_address.split(":")
                    if ":" in ssh_address
                    else [ssh_address, "22"]
                )  # default to port 22
                if "47" in ssh_port:
                    self.__ssh_tunnel_mode = self.__ssh_tunnel_mode.replace(
                        ":47", ""
                    )  # port-47 is reserved for testing
                else:
                    # extract ip for validation
                    ssh_user, ssh_ip = (
                        ssh_address.split("@")
                        if "@" in ssh_address
                        else ["", ssh_address]
                    )
                    # validate ip specified port
                    assert check_open_port(
                        ssh_ip, port=int(ssh_port)
                    ), "[NetGear:ERROR] :: Host `{}` is not available for SSH Tunneling at port-{}!".format(
                        ssh_address, ssh_port
                    )

        # Handle multiple exclusive modes if enabled
        if self.__multiclient_mode and self.__multiserver_mode:
            raise ValueError(
                "[NetGear:ERROR] :: Multi-Client and Multi-Server Mode cannot be enabled simultaneously!"
            )
        elif self.__multiserver_mode or self.__multiclient_mode:
            # check if Bidirectional Mode also enabled
            if self.__bi_mode:
                # log it
                self.__logging and logger.debug(
                    "Bidirectional Data Transmission is also enabled for this connection!"
                )
            # check if SSH Tunneling Mode also enabled
            if self.__ssh_tunnel_mode:
                # raise error
                raise ValueError(
                    "[NetGear:ERROR] :: SSH Tunneling and {} Mode cannot be enabled simultaneously. Kindly refer docs!".format(
                        "Multi-Server" if self.__multiserver_mode else "Multi-Client"
                    )
                )
        elif self.__bi_mode:
            # log Bidirectional mode activation
            self.__logging and logger.debug(
                "Bidirectional Data Transmission is enabled for this connection!"
            )
        elif self.__ssh_tunnel_mode:
            # log Bidirectional mode activation
            self.__logging and logger.debug(
                "SSH Tunneling is enabled for host:`{}` with `{}` back-end.".format(
                    self.__ssh_tunnel_mode,
                    "paramiko" if self.__paramiko_present else "pexpect",
                )
            )

        # On Windows, NetGear requires the ``WindowsSelectorEventLoop`` but Python 3.8 and above,
        # defaults to an ``ProactorEventLoop`` loop that is not compatible with it. Thereby,
        # we had to set it manually.
        platform.system() == "Windows" and asyncio.set_event_loop_policy(
            asyncio.WindowsSelectorEventLoopPolicy()
        )

        # define ZMQ messaging context instance
        self.__msg_context = zmq.Context.instance()

        # initialize and assign receive mode to global variable
        self.__receive_mode = receive_mode

        # Handle Secure mode
        if self.__secure_mode > 0:
            # activate and log if overwriting is enabled
            if receive_mode:
                overwrite_cert = False
                overwrite_cert and logger.warning(
                    "Overwriting ZMQ Authentication certificates is disabled for Client's end!"
                )
            else:
                overwrite_cert and self.__logging and logger.info(
                    "Overwriting ZMQ Authentication certificates over previous ones!"
                )

            # Validate certificate generation paths
            # Start threaded authenticator for this context
            try:
                # check if custom certificates path is specified
                if custom_cert_location:
                    (
                        auth_cert_dir,
                        self.__auth_secretkeys_dir,
                        self.__auth_publickeys_dir,
                    ) = generate_auth_certificates(
                        custom_cert_location, overwrite=overwrite_cert, logging=logging
                    )
                else:
                    # otherwise auto-generate suitable path
                    (
                        auth_cert_dir,
                        self.__auth_secretkeys_dir,
                        self.__auth_publickeys_dir,
                    ) = generate_auth_certificates(
                        os.path.join(expanduser("~"), ".vidgear"),
                        overwrite=overwrite_cert,
                        logging=logging,
                    )
                # log it
                self.__logging and logger.debug(
                    "`{}` is the default location for storing ZMQ authentication certificates/keys.".format(
                        auth_cert_dir
                    )
                )

                # start an authenticator for this context
                self.__z_auth = ThreadAuthenticator(self.__msg_context)
                self.__z_auth.start()
                self.__z_auth.allow(str(address))  # allow current address

                # check if `IronHouse` is activated
                if self.__secure_mode == 2:
                    # tell authenticator to use the certificate from given valid dir
                    self.__z_auth.configure_curve(
                        domain="*", location=self.__auth_publickeys_dir
                    )
                else:
                    # otherwise tell the authenticator how to handle the CURVE requests, if `StoneHouse` is activated
                    self.__z_auth.configure_curve(
                        domain="*", location=auth.CURVE_ALLOW_ANY
                    )
            except zmq.ZMQError as e:
                if "Address in use" in str(e):
                    logger.info("ZMQ Authenticator already running.")
                else:
                    # catch if any error occurred and disable Secure mode
                    logger.exception(str(e))
                    self.__secure_mode = 0
                    logger.error(
                        "ZMQ Security Mechanism is disabled for this connection due to errors!"
                    )

        # check whether `receive_mode` is enabled
        if self.__receive_mode:
            # define connection address
            address = "*" if address is None else address

            # check if multiserver_mode is enabled
            if self.__multiserver_mode:
                # check if unique server port address list/tuple is assigned or not in multiserver_mode
                if port is None or not isinstance(port, (tuple, list)):
                    # raise error if not
                    raise ValueError(
                        "[NetGear:ERROR] :: Incorrect port value! Kindly provide a list/tuple of Server ports while Multi-Server mode is enabled. For more information refer VidGear docs."
                    )
                else:
                    # otherwise log it
                    logger.debug(
                        "Enabling Multi-Server Mode at PORTS: {}!".format(port)
                    )
                # create port address buffer for keeping track of connected client's port(s)
                self.__port_buffer = []
            # check if multiclient_mode is enabled
            elif self.__multiclient_mode:
                # check if unique server port address is assigned or not in multiclient_mode
                if port is None:
                    # raise error if not
                    raise ValueError(
                        "[NetGear:ERROR] :: Kindly provide a unique & valid port value at Client-end. For more information refer VidGear docs."
                    )
                else:
                    # otherwise log it
                    logger.debug(
                        "Enabling Multi-Client Mode at PORT: {} on this device!".format(
                            port
                        )
                    )
                # assign value to global variable
                self.__port = port
            else:
                # otherwise assign local port address if None
                port = "5555" if port is None else port

            try:
                # define thread-safe messaging socket
                self.__msg_socket = self.__msg_context.socket(msg_pattern[1])

                # define pub-sub flag
                self.__pattern == 2 and self.__msg_socket.set_hwm(1)

                # enable specified secure mode for the socket
                if self.__secure_mode > 0:
                    # load server key
                    server_secret_file = os.path.join(
                        self.__auth_secretkeys_dir, "server.key_secret"
                    )
                    server_public, server_secret = auth.load_certificate(
                        server_secret_file
                    )
                    # load  all CURVE keys
                    self.__msg_socket.curve_secretkey = server_secret
                    self.__msg_socket.curve_publickey = server_public
                    # enable CURVE connection for this socket
                    self.__msg_socket.curve_server = True

                # define exclusive socket options for `patterns=2`
                if self.__pattern == 2:
                    self.__msg_socket.setsockopt_string(zmq.SUBSCRIBE, "")
                    self.__subscriber_timeout and self.__msg_socket.setsockopt(
                        zmq.RCVTIMEO, self.__subscriber_timeout
                    )
                    self.__subscriber_timeout and self.__msg_socket.setsockopt(
                        zmq.LINGER, 0
                    )

                # if multiserver_mode is enabled, then assign port addresses to zmq socket
                if self.__multiserver_mode:
                    # bind socket to given server protocol, address and ports
                    for pt in port:
                        self.__msg_socket.bind(
                            protocol + "://" + str(address) + ":" + str(pt)
                        )
                else:
                    # bind socket to given protocol, address and port normally
                    self.__msg_socket.bind(
                        protocol + "://" + str(address) + ":" + str(port)
                    )

                # additional settings
                if pattern < 2:
                    if self.__multiserver_mode:
                        self.__connection_address = []
                        for pt in port:
                            self.__connection_address.append(
                                protocol + "://" + str(address) + ":" + str(pt)
                            )
                    else:
                        self.__connection_address = (
                            protocol + "://" + str(address) + ":" + str(port)
                        )
                    self.__msg_pattern = msg_pattern[1]
                    self.__poll.register(self.__msg_socket, zmq.POLLIN)
                    self.__logging and logger.debug(
                        "Reliable transmission is enabled for this pattern with max-retries: {} and timeout: {} secs.".format(
                            self.__max_retries, self.__request_timeout / 1000
                        )
                    )
                else:
                    self.__logging and self.__subscriber_timeout and logger.debug(
                        "Timeout: {} secs is enabled for this system.".format(
                            self.__subscriber_timeout / 1000
                        )
                    )

            except Exception as e:
                # otherwise log and raise error
                logger.exception(str(e))
                # Handle Secure Mode
                self.__secure_mode and logger.critical(
                    "Failed to activate Secure Mode: `{}` for this connection!".format(
                        valid_security_mech[self.__secure_mode]
                    )
                )
                # raise errors for exclusive modes
                if self.__multiserver_mode or self.__multiclient_mode:
                    raise RuntimeError(
                        "[NetGear:ERROR] :: Receive Mode failed to activate {} Mode at address: {} with pattern: {}! Kindly recheck all parameters.".format(
                            (
                                "Multi-Server"
                                if self.__multiserver_mode
                                else "Multi-Client"
                            ),
                            (protocol + "://" + str(address) + ":" + str(port)),
                            pattern,
                        )
                    )
                else:
                    self.__bi_mode and logger.critical(
                        "Failed to activate Bidirectional Mode for this connection!"
                    )
                    raise RuntimeError(
                        "[NetGear:ERROR] :: Receive Mode failed to bind address: {} and pattern: {}! Kindly recheck all parameters.".format(
                            (protocol + "://" + str(address) + ":" + str(port)), pattern
                        )
                    )

            # Handle threaded queue mode
            self.__logging and logger.debug(
                "Threaded Queue Mode is enabled by default for this connection."
            )

            # define deque and assign it to global var
            self.__queue = deque(maxlen=96)  # max len 96 to check overflow

            # initialize and start threaded recv_handler
            self.__thread = Thread(target=self.__recv_handler, name="NetGear", args=())
            self.__thread.daemon = True
            self.__thread.start()

            if self.__logging:
                # finally log progress
                logger.debug(
                    "Successfully Binded to address: {} with pattern: {}.".format(
                        (protocol + "://" + str(address) + ":" + str(port)), pattern
                    )
                )
                self.__jpeg_compression and logger.debug(
                    "JPEG Frame-Compression is activated for this connection with Colorspace:`{}`, Quality:`{}`%, Fastdct:`{}`, and Fastupsample:`{}`.".format(
                        self.__jpeg_compression_colorspace,
                        self.__jpeg_compression_quality,
                        ("enabled" if self.__jpeg_compression_fastdct else "disabled"),
                        (
                            "enabled"
                            if self.__jpeg_compression_fastupsample
                            else "disabled"
                        ),
                    )
                )
                self.__secure_mode and logger.debug(
                    "Successfully enabled ZMQ Security Mechanism: `{}` for this connection.".format(
                        valid_security_mech[self.__secure_mode]
                    )
                )
                logger.debug("Multi-threaded Receive Mode is successfully enabled.")
                logger.debug("Unique System ID is {}.".format(self.__id))
                logger.debug("Receive Mode is now activated.")

        else:
            # otherwise default to `Send Mode`
            # define connection address
            address = "localhost" if address is None else address

            # check if multiserver_mode is enabled
            if self.__multiserver_mode:
                # check if unique server port address is assigned or not in multiserver_mode
                if port is None:
                    # raise error if not
                    raise ValueError(
                        "[NetGear:ERROR] :: Kindly provide a unique & valid port value at Server-end. For more information refer VidGear docs."
                    )
                else:
                    # otherwise log it
                    logger.debug(
                        "Enabling Multi-Server Mode at PORT: {} on this device!".format(
                            port
                        )
                    )
                # assign value to global variable
                self.__port = port
            # check if multiclient_mode is enabled
            elif self.__multiclient_mode:
                # check if unique client port address list/tuple is assigned or not in multiclient_mode
                if port is None or not isinstance(port, (tuple, list)):
                    # raise error if not
                    raise ValueError(
                        "[NetGear:ERROR] :: Incorrect port value! Kindly provide a list/tuple of Client ports while Multi-Client mode is enabled. For more information refer VidGear docs."
                    )
                else:
                    # otherwise log it
                    logger.debug(
                        "Enabling Multi-Client Mode at PORTS: {}!".format(port)
                    )
                # create port address buffer for keeping track of connected client ports
                self.__port_buffer = []
            else:
                # otherwise assign local port address if None
                port = "5555" if port is None else port

            try:
                # define thread-safe messaging socket
                self.__msg_socket = self.__msg_context.socket(msg_pattern[0])

                # if req/rep pattern, define additional flags
                if self.__pattern == 1:
                    self.__msg_socket.REQ_RELAXED = True
                    self.__msg_socket.REQ_CORRELATE = True

                # if pub/sub pattern, define additional optimizer
                if self.__pattern == 2:
                    self.__msg_socket.set_hwm(1)

                # enable specified secure mode for the socket
                if self.__secure_mode > 0:
                    # load client key
                    client_secret_file = os.path.join(
                        self.__auth_secretkeys_dir, "client.key_secret"
                    )
                    client_public, client_secret = auth.load_certificate(
                        client_secret_file
                    )
                    # load  all CURVE keys
                    self.__msg_socket.curve_secretkey = client_secret
                    self.__msg_socket.curve_publickey = client_public
                    # load server key
                    server_public_file = os.path.join(
                        self.__auth_publickeys_dir, "server.key"
                    )
                    server_public, _ = auth.load_certificate(server_public_file)
                    # inject public key to make a CURVE connection.
                    self.__msg_socket.curve_serverkey = server_public

                # check if multi-client_mode is enabled
                if self.__multiclient_mode:
                    # bind socket to given server protocol, address and ports
                    for pt in port:
                        self.__msg_socket.connect(
                            protocol + "://" + str(address) + ":" + str(pt)
                        )
                else:
                    # handle SSH tunneling if enabled
                    if self.__ssh_tunnel_mode:
                        # establish tunnel connection
                        ssh.tunnel_connection(
                            self.__msg_socket,
                            protocol + "://" + str(address) + ":" + str(port),
                            self.__ssh_tunnel_mode,
                            keyfile=self.__ssh_tunnel_keyfile,
                            password=self.__ssh_tunnel_pwd,
                            paramiko=self.__paramiko_present,
                        )
                    else:
                        # connect socket to given protocol, address and port
                        self.__msg_socket.connect(
                            protocol + "://" + str(address) + ":" + str(port)
                        )

                # additional settings
                if pattern < 2:
                    if self.__multiclient_mode:
                        self.__connection_address = []
                        for pt in port:
                            self.__connection_address.append(
                                protocol + "://" + str(address) + ":" + str(pt)
                            )
                    else:
                        self.__connection_address = (
                            protocol + "://" + str(address) + ":" + str(port)
                        )
                    self.__msg_pattern = msg_pattern[0]
                    self.__poll.register(self.__msg_socket, zmq.POLLIN)

                    self.__logging and logger.debug(
                        "Reliable transmission is enabled for this pattern with max-retries: {} and timeout: {} secs.".format(
                            self.__max_retries, self.__request_timeout / 1000
                        )
                    )

            except Exception as e:
                # otherwise log and raise error
                logger.exception(str(e))
                # Handle Secure Mode
                self.__secure_mode and logger.critical(
                    "Failed to activate Secure Mode: `{}` for this connection!".format(
                        valid_security_mech[self.__secure_mode]
                    )
                )
                # raise errors for exclusive modes
                if self.__multiserver_mode or self.__multiclient_mode:
                    raise RuntimeError(
                        "[NetGear:ERROR] :: Send Mode failed to activate {} Mode at address: {} with pattern: {}! Kindly recheck all parameters.".format(
                            (
                                "Multi-Server"
                                if self.__multiserver_mode
                                else "Multi-Client"
                            ),
                            (protocol + "://" + str(address) + ":" + str(port)),
                            pattern,
                        )
                    )
                else:
                    self.__bi_mode and logger.critical(
                        "Failed to activate Bidirectional Mode for this connection!"
                    )
                    self.__ssh_tunnel_mode and logger.critical(
                        "Failed to initiate SSH Tunneling Mode for this server with `{}` back-end!".format(
                            "paramiko" if self.__paramiko_present else "pexpect"
                        )
                    )
                    raise RuntimeError(
                        "[NetGear:ERROR] :: Send Mode failed to connect address: {} and pattern: {}! Kindly recheck all parameters.".format(
                            (protocol + "://" + str(address) + ":" + str(port)), pattern
                        )
                    )

            if self.__logging:
                # finally log progress
                logger.debug(
                    "Successfully connected to address: {} with pattern: {}.".format(
                        (protocol + "://" + str(address) + ":" + str(port)), pattern
                    )
                )
                self.__jpeg_compression and logger.debug(
                    "JPEG Frame-Compression is activated for this connection with Colorspace:`{}`, Quality:`{}`%, Fastdct:`{}`, and Fastupsample:`{}`.".format(
                        self.__jpeg_compression_colorspace,
                        self.__jpeg_compression_quality,
                        ("enabled" if self.__jpeg_compression_fastdct else "disabled"),
                        (
                            "enabled"
                            if self.__jpeg_compression_fastupsample
                            else "disabled"
                        ),
                    )
                )
                self.__secure_mode and logger.debug(
                    "Enabled ZMQ Security Mechanism: `{}` for this connection.".format(
                        valid_security_mech[self.__secure_mode]
                    )
                )
                logger.debug("Unique System ID is {}.".format(self.__id))
                logger.debug(
                    "Send Mode is successfully activated and ready to send data."
                )

    def __recv_handler(self):
        """
        A threaded receiver handler, that keep iterating data from ZMQ socket to a internally monitored deque,
        until the thread is terminated, or socket disconnects.
        """
        # initialize variables
        frame = None
        msg_json = None

        # keep looping infinitely until the thread is terminated
        while not self.__terminate:
            # check queue buffer for overflow
            if len(self.__queue) >= 96:
                # stop iterating if overflowing occurs
                time.sleep(0.000001)
                continue

            if self.__pattern < 2:
                socks = dict(self.__poll.poll(self.__request_timeout * 3))
                if socks.get(self.__msg_socket) == zmq.POLLIN:
                    msg_json = self.__msg_socket.recv_json(
                        flags=self.__msg_flag | zmq.DONTWAIT
                    )
                else:
                    logger.critical("No response from Server(s), Reconnecting again...")
                    self.__msg_socket.close(linger=0)
                    self.__poll.unregister(self.__msg_socket)
                    self.__max_retries -= 1

                    if not (self.__max_retries):
                        if self.__multiserver_mode:
                            logger.error("All Servers seems to be offline, Abandoning!")
                        else:
                            logger.error("Server seems to be offline, Abandoning!")
                        self.__terminate = True
                        continue

                    # Create new connection
                    try:
                        self.__msg_socket = self.__msg_context.socket(
                            self.__msg_pattern
                        )
                        if isinstance(self.__connection_address, list):
                            for _connection in self.__connection_address:
                                self.__msg_socket.bind(_connection)
                        else:
                            self.__msg_socket.bind(self.__connection_address)
                    except Exception as e:
                        logger.exception(str(e))
                        self.__terminate = True
                        raise RuntimeError("API failed to restart the Client-end!")
                    self.__poll.register(self.__msg_socket, zmq.POLLIN)

                    continue
            else:
                try:
                    msg_json = self.__msg_socket.recv_json(flags=self.__msg_flag)
                except zmq.ZMQError as e:
                    if e.errno == zmq.EAGAIN:
                        logger.critical("Connection Timeout. Exiting!")
                        self.__terminate = True
                        self.__queue.append(None)
                        break

            # check if terminate_flag` received
            if msg_json and msg_json["terminate_flag"]:
                # if multiserver_mode is enabled
                if self.__multiserver_mode:
                    # check and remove from which ports signal is received
                    if msg_json["port"] in self.__port_buffer:
                        # if pattern is 1, then send back server the info about termination
                        if self.__pattern == 1:
                            self.__msg_socket.send_string(
                                "Termination signal successfully received at client!"
                            )
                        self.__port_buffer.remove(msg_json["port"])
                        self.__logging and logger.warning(
                            "Termination signal received from Server at port: {}!".format(
                                msg_json["port"]
                            )
                        )
                    # if termination signal received from all servers then exit client.
                    if not self.__port_buffer:
                        logger.critical(
                            "Termination signal received from all Servers!!!"
                        )
                        self.__terminate = True  # termination
                else:
                    # if pattern is 1, then send back server the info about termination
                    if self.__pattern == 1:
                        self.__msg_socket.send_string(
                            "Termination signal successfully received at Client's end!"
                        )
                    # termination
                    self.__terminate = True
                    # notify client
                    self.__logging and logger.critical(
                        "Termination signal received from server!"
                    )
                continue

            try:
                msg_data = self.__msg_socket.recv(
                    flags=self.__msg_flag | zmq.DONTWAIT,
                    copy=self.__msg_copy,
                    track=self.__msg_track,
                )
            except zmq.ZMQError as e:
                logger.critical("Socket Session Expired. Exiting!")
                self.__terminate = True
                self.__queue.append(None)
                break

            # handle data transfer in synchronous modes.
            if self.__pattern < 2:
                if self.__bi_mode or self.__multiclient_mode:
                    # check if we are returning `ndarray` frames
                    if not (self.__return_data is None) and isinstance(
                        self.__return_data, np.ndarray
                    ):
                        # handle return data for compression
                        return_data = np.copy(self.__return_data)

                        # check whether exit_flag is False
                        if not (return_data.flags["C_CONTIGUOUS"]):
                            # check whether the incoming frame is contiguous
                            return_data = np.ascontiguousarray(
                                return_data, dtype=return_data.dtype
                            )

                        # handle jpeg-compression encoding
                        if self.__jpeg_compression:
                            if self.__jpeg_compression_colorspace == "GRAY":
                                if return_data.ndim == 2:
                                    # patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11
                                    return_data = return_data[:, :, np.newaxis]
                                return_data = simplejpeg.encode_jpeg(
                                    return_data,
                                    quality=self.__jpeg_compression_quality,
                                    colorspace=self.__jpeg_compression_colorspace,
                                    fastdct=self.__jpeg_compression_fastdct,
                                )
                            else:
                                return_data = simplejpeg.encode_jpeg(
                                    return_data,
                                    quality=self.__jpeg_compression_quality,
                                    colorspace=self.__jpeg_compression_colorspace,
                                    colorsubsampling="422",
                                    fastdct=self.__jpeg_compression_fastdct,
                                )

                        return_dict = (
                            dict(port=self.__port)
                            if self.__multiclient_mode
                            else dict()
                        )

                        return_dict.update(
                            dict(
                                return_type=(type(self.__return_data).__name__),
                                compression=(
                                    {
                                        "dct": self.__jpeg_compression_fastdct,
                                        "ups": self.__jpeg_compression_fastupsample,
                                        "colorspace": self.__jpeg_compression_colorspace,
                                    }
                                    if self.__jpeg_compression
                                    else False
                                ),
                                array_dtype=(
                                    str(self.__return_data.dtype)
                                    if not (self.__jpeg_compression)
                                    else ""
                                ),
                                array_shape=(
                                    self.__return_data.shape
                                    if not (self.__jpeg_compression)
                                    else ""
                                ),
                                data=None,
                            )
                        )

                        # send the json dict
                        self.__msg_socket.send_json(
                            return_dict, self.__msg_flag | zmq.SNDMORE
                        )
                        # send the array with correct flags
                        self.__msg_socket.send(
                            return_data,
                            flags=self.__msg_flag,
                            copy=self.__msg_copy,
                            track=self.__msg_track,
                        )
                    else:
                        return_dict = (
                            dict(port=self.__port)
                            if self.__multiclient_mode
                            else dict()
                        )
                        return_dict.update(
                            dict(
                                return_type=(type(self.__return_data).__name__),
                                data=self.__return_data,
                            )
                        )
                        self.__msg_socket.send_json(return_dict, self.__msg_flag)
                else:
                    # send confirmation message to server
                    self.__msg_socket.send_string(
                        "Data received on device: {} !".format(self.__id)
                    )
            else:
                # else raise warning
                if self.__return_data:
                    logger.warning("`return_data` is disabled for this pattern!")

            # check if encoding was enabled
            if msg_json["compression"]:
                # decode JPEG frame
                frame = simplejpeg.decode_jpeg(
                    msg_data,
                    colorspace=msg_json["compression"]["colorspace"],
                    fastdct=self.__jpeg_compression_fastdct
                    or msg_json["compression"]["dct"],
                    fastupsample=self.__jpeg_compression_fastupsample
                    or msg_json["compression"]["ups"],
                )
                # check if valid frame returned
                if frame is None:
                    self.__terminate = True
                    # otherwise raise error and exit
                    raise RuntimeError(
                        "[NetGear:ERROR] :: Received compressed JPEG frame decoding failed"
                    )
                if msg_json["compression"]["colorspace"] == "GRAY" and frame.ndim == 3:
                    # patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11
                    frame = np.squeeze(frame, axis=2)
            else:
                # recover and reshape frame from buffer
                frame_buffer = np.frombuffer(msg_data, dtype=msg_json["dtype"])
                frame = frame_buffer.reshape(msg_json["shape"])

            # check if multiserver_mode
            if self.__multiserver_mode:
                # save the unique port addresses
                if not msg_json["port"] in self.__port_buffer:
                    self.__port_buffer.append(msg_json["port"])
                # extract if any message from server and display it
                if msg_json["message"]:
                    self.__queue.append((msg_json["port"], msg_json["message"], frame))
                else:
                    # append recovered unique port and frame to queue
                    self.__queue.append((msg_json["port"], frame))
            # extract if any message from server if Bidirectional Mode is enabled
            elif self.__bi_mode:
                if msg_json["message"]:
                    # append grouped frame and data to queue
                    self.__queue.append((msg_json["message"], frame))
                else:
                    self.__queue.append((None, frame))
            else:
                # otherwise append recovered frame to queue
                self.__queue.append(frame)

    def recv(self, return_data=None) -> Optional[NDArray]:
        """
        A Receiver end method, that extracts received frames synchronously from monitored deque, while maintaining a
        fixed-length frame buffer in the memory, and blocks the thread if the deque is full.

        Parameters:
            return_data (any): inputs return data _(of any datatype)_, for sending back to Server.

        **Returns:** A n-dimensional numpy array.
        """
        # check whether `receive mode` is activated
        if not (self.__receive_mode):
            # raise value error and exit
            self.__terminate = True
            raise ValueError(
                "[NetGear:ERROR] :: `recv()` function cannot be used while receive_mode is disabled. Kindly refer vidgear docs!"
            )

        # handle Bidirectional return data
        if (self.__bi_mode or self.__multiclient_mode) and not (return_data is None):
            self.__return_data = return_data

        # check whether or not termination flag is enabled
        while not self.__terminate:
            try:
                # check if queue is empty
                if len(self.__queue) > 0:
                    return self.__queue.popleft()
                else:
                    time.sleep(0.00001)
                    continue
            except KeyboardInterrupt:
                self.__terminate = True
                break
        # otherwise return NoneType
        return None

    def send(self, frame: NDArray, message: Any = None) -> Optional[Any]:
        """
        A Server end method, that sends the data and frames over the network to Client(s).

        Parameters:
            frame (numpy.ndarray): inputs numpy array(frame).
            message (any): input for sending additional data _(of any datatype except `numpy.ndarray`)_ to Client(s).

        **Returns:** Data _(of any datatype)_ in selected exclusive modes, otherwise None-type.

        """
        # check whether `receive_mode` is disabled
        if self.__receive_mode:
            # raise value error and exit
            self.__terminate = True
            raise ValueError(
                "[NetGear:ERROR] :: `send()` function cannot be used while receive_mode is enabled. Kindly refer vidgear docs!"
            )

        if not (message is None) and isinstance(message, np.ndarray):
            logger.warning(
                "Skipped unsupported `message` of datatype: {}!".format(
                    type(message).__name__
                )
            )
            message = None

        # define exit_flag and assign value
        exit_flag = True if (frame is None or self.__terminate) else False

        # check whether exit_flag is False
        if not (exit_flag) and not (frame.flags["C_CONTIGUOUS"]):
            # check whether the incoming frame is contiguous
            frame = np.ascontiguousarray(frame, dtype=frame.dtype)

        # handle JPEG compression encoding
        if self.__jpeg_compression:
            if self.__jpeg_compression_colorspace == "GRAY":
                if frame.ndim == 2:
                    # patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11
                    frame = np.expand_dims(frame, axis=2)
                frame = simplejpeg.encode_jpeg(
                    frame,
                    quality=self.__jpeg_compression_quality,
                    colorspace=self.__jpeg_compression_colorspace,
                    fastdct=self.__jpeg_compression_fastdct,
                )
            else:
                frame = simplejpeg.encode_jpeg(
                    frame,
                    quality=self.__jpeg_compression_quality,
                    colorspace=self.__jpeg_compression_colorspace,
                    colorsubsampling="422",
                    fastdct=self.__jpeg_compression_fastdct,
                )

        # check if multiserver_mode is activated and assign values with unique port
        msg_dict = dict(port=self.__port) if self.__multiserver_mode else dict()

        # prepare the exclusive json dict
        msg_dict.update(
            dict(
                terminate_flag=exit_flag,
                compression=(
                    {
                        "dct": self.__jpeg_compression_fastdct,
                        "ups": self.__jpeg_compression_fastupsample,
                        "colorspace": self.__jpeg_compression_colorspace,
                    }
                    if self.__jpeg_compression
                    else False
                ),
                message=message,
                pattern=str(self.__pattern),
                dtype=str(frame.dtype) if not (self.__jpeg_compression) else "",
                shape=frame.shape if not (self.__jpeg_compression) else "",
            )
        )

        # send the json dict
        self.__msg_socket.send_json(msg_dict, self.__msg_flag | zmq.SNDMORE)
        # send the frame array with correct flags
        self.__msg_socket.send(
            frame, flags=self.__msg_flag, copy=self.__msg_copy, track=self.__msg_track
        )

        # check if synchronous patterns, then wait for confirmation
        if self.__pattern < 2:
            # check if Bidirectional data transmission is enabled
            if self.__bi_mode or self.__multiclient_mode:
                # handles return data
                recvd_data = None

                socks = dict(self.__poll.poll(self.__request_timeout))
                if socks.get(self.__msg_socket) == zmq.POLLIN:
                    # handle return data
                    recv_json = self.__msg_socket.recv_json(flags=self.__msg_flag)
                else:
                    logger.critical("No response from Client, Reconnecting again...")
                    # Socket is confused. Close and remove it.
                    self.__msg_socket.setsockopt(zmq.LINGER, 0)
                    self.__msg_socket.close()
                    self.__poll.unregister(self.__msg_socket)
                    self.__max_retries -= 1

                    if not (self.__max_retries):
                        if self.__multiclient_mode:
                            logger.error(
                                "All Clients failed to respond on multiple attempts."
                            )
                        else:
                            logger.error(
                                "Client failed to respond on multiple attempts."
                            )
                        self.__terminate = True
                        raise RuntimeError(
                            "[NetGear:ERROR] :: Client(s) seems to be offline, Abandoning."
                        )

                    # Create new connection
                    self.__msg_socket = self.__msg_context.socket(self.__msg_pattern)
                    if isinstance(self.__connection_address, list):
                        for _connection in self.__connection_address:
                            self.__msg_socket.connect(_connection)
                    else:
                        # handle SSH tunneling if enabled
                        if self.__ssh_tunnel_mode:
                            # establish tunnel connection
                            ssh.tunnel_connection(
                                self.__msg_socket,
                                self.__connection_address,
                                self.__ssh_tunnel_mode,
                                keyfile=self.__ssh_tunnel_keyfile,
                                password=self.__ssh_tunnel_pwd,
                                paramiko=self.__paramiko_present,
                            )
                        else:
                            # connect normally
                            self.__msg_socket.connect(self.__connection_address)
                    self.__poll.register(self.__msg_socket, zmq.POLLIN)
                    # return None for mean-time
                    return None

                # save the unique port addresses
                if (
                    self.__multiclient_mode
                    and not recv_json["port"] in self.__port_buffer
                ):
                    self.__port_buffer.append(recv_json["port"])

                if recv_json["return_type"] == "ndarray":
                    recv_array = self.__msg_socket.recv(
                        flags=self.__msg_flag,
                        copy=self.__msg_copy,
                        track=self.__msg_track,
                    )
                    # check if encoding was enabled
                    if recv_json["compression"]:
                        # decode JPEG frame
                        recvd_data = simplejpeg.decode_jpeg(
                            recv_array,
                            colorspace=recv_json["compression"]["colorspace"],
                            fastdct=self.__jpeg_compression_fastdct
                            or recv_json["compression"]["dct"],
                            fastupsample=self.__jpeg_compression_fastupsample
                            or recv_json["compression"]["ups"],
                        )
                        # check if valid frame returned
                        if recvd_data is None:
                            self.__terminate = True
                            # otherwise raise error and exit
                            raise RuntimeError(
                                "[NetGear:ERROR] :: Received compressed frame `{}` decoding failed with flag: {}.".format(
                                    recv_json["compression"],
                                    self.__ex_compression_params,
                                )
                            )

                        if (
                            recv_json["compression"]["colorspace"] == "GRAY"
                            and recvd_data.ndim == 3
                        ):
                            # patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11
                            recvd_data = np.squeeze(recvd_data, axis=2)
                    else:
                        recvd_data = np.frombuffer(
                            recv_array, dtype=recv_json["array_dtype"]
                        ).reshape(recv_json["array_shape"])
                else:
                    recvd_data = recv_json["data"]

                return (
                    (recv_json["port"], recvd_data)
                    if self.__multiclient_mode
                    else recvd_data
                )
            else:
                # otherwise log normally
                socks = dict(self.__poll.poll(self.__request_timeout))
                if socks.get(self.__msg_socket) == zmq.POLLIN:
                    recv_confirmation = self.__msg_socket.recv()
                else:
                    logger.critical("No response from Client, Reconnecting again...")
                    # Socket is confused. Close and remove it.
                    self.__msg_socket.setsockopt(zmq.LINGER, 0)
                    self.__msg_socket.close()
                    self.__poll.unregister(self.__msg_socket)
                    self.__max_retries -= 1

                    if not (self.__max_retries):
                        logger.error("Client failed to respond on repeated attempts.")
                        self.__terminate = True
                        raise RuntimeError(
                            "[NetGear:ERROR] :: Client seems to be offline, Abandoning!"
                        )

                    # Create new connection
                    self.__msg_socket = self.__msg_context.socket(self.__msg_pattern)
                    # handle SSH tunneling if enabled
                    if self.__ssh_tunnel_mode:
                        # establish tunnel connection
                        ssh.tunnel_connection(
                            self.__msg_socket,
                            self.__connection_address,
                            self.__ssh_tunnel_mode,
                            keyfile=self.__ssh_tunnel_keyfile,
                            password=self.__ssh_tunnel_pwd,
                            paramiko=self.__paramiko_present,
                        )
                    else:
                        # connect normally
                        self.__msg_socket.connect(self.__connection_address)
                    self.__poll.register(self.__msg_socket, zmq.POLLIN)
                    return None

                # log confirmation
                self.__logging and logger.debug(recv_confirmation)

    def close(self, kill: bool = False) -> None:
        """
        Safely terminates the threads, and NetGear resources.

        Parameters:
            kill (bool): Kills ZMQ context instead of graceful exiting in receive mode.
        """
        # log it
        self.__logging and logger.debug(
            "Terminating various {} Processes.".format(
                "Receive Mode" if self.__receive_mode else "Send Mode"
            )
        )
        #  whether `receive_mode` is enabled or not
        if self.__receive_mode:
            # check whether queue mode is empty
            if not (self.__queue is None) and self.__queue:
                self.__queue.clear()
            # call immediate termination
            self.__terminate = True
            # properly close the socket
            self.__logging and logger.debug("Terminating. Please wait...")
            # Handle Secure Mode Thread
            if self.__z_auth:
                self.__logging and logger.debug("Terminating Authenticator Thread.")
                self.__z_auth.stop()
                while self.__z_auth.is_alive():
                    pass
            # wait until stream resources are released
            # (producer thread might be still grabbing frame)
            if self.__thread is not None:
                self.__logging and logger.debug("Terminating Main Thread.")
                # properly handle thread exit
                if self.__thread.is_alive() and kill:
                    # force close if still alive
                    logger.warning("Thread still running...Killing it forcefully!")
                    self.__msg_context.destroy()
                    self.__thread.join()
                else:
                    self.__msg_socket.close(linger=0)
                    self.__thread.join()
                self.__thread = None
            self.__logging and logger.debug("Terminated Successfully!")
        else:
            # indicate that process should be terminated
            self.__terminate = True
            # log if kill enabled
            kill and logger.warning(
                "`kill` parmeter is only available in the receive mode."
            )
            # Handle Secure Mode Thread
            if self.__z_auth:
                self.__logging and logger.debug("Terminating Authenticator Thread.")
                self.__z_auth.stop()
                while self.__z_auth.is_alive():
                    pass
            # check if all attempts of reconnecting failed, then skip to closure
            if (self.__pattern < 2 and not self.__max_retries) or (
                self.__multiclient_mode and not self.__port_buffer
            ):
                try:
                    # properly close the socket
                    self.__msg_socket.setsockopt(zmq.LINGER, 0)
                    self.__msg_socket.close()
                except ZMQError:
                    pass
                finally:
                    # exit
                    return

            if self.__multiserver_mode:
                # check if multiserver_mode
                # send termination flag to client with its unique port
                term_dict = dict(terminate_flag=True, port=self.__port)
            else:
                # otherwise send termination flag to client
                term_dict = dict(terminate_flag=True)

            try:
                if self.__multiclient_mode:
                    for _ in self.__port_buffer:
                        self.__msg_socket.send_json(term_dict)
                else:
                    self.__msg_socket.send_json(term_dict)

                # check for confirmation if available within 1/5 timeout
                if self.__pattern < 2:
                    self.__logging and logger.debug("Terminating. Please wait...")
                    if self.__msg_socket.poll(self.__request_timeout // 5, zmq.POLLIN):
                        self.__msg_socket.recv()
            except Exception as e:
                if not isinstance(e, ZMQError):
                    logger.exception(str(e))
            finally:
                # properly close the socket
                self.__msg_socket.setsockopt(zmq.LINGER, 0)
                self.__msg_socket.close()
                self.__logging and logger.debug("Terminated Successfully!")

__init__(address=None, port=None, protocol=None, pattern=0, receive_mode=False, logging=False, **options) ¶

This constructor method initializes the object state and attributes of the NetGear class.

Parameters:

Name Type Description Default
address str

sets the valid network address of the Server/Client.

None
port str

sets the valid Network Port of the Server/Client.

None
protocol str

sets the valid messaging protocol between Server/Client.

None
pattern int

sets the supported messaging pattern(flow of communication) between Server/Client

0
receive_mode bool

select the Netgear's Mode of operation.

False
logging bool

enables/disables logging.

False
options dict

provides the flexibility to alter various NetGear internal properties.

{}
Source code in vidgear/gears/netgear.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
def __init__(
    self,
    address: str = None,
    port: str = None,
    protocol: str = None,
    pattern: int = 0,
    receive_mode: bool = False,
    logging: bool = False,
    **options: dict
):
    """
    This constructor method initializes the object state and attributes of the NetGear class.

    Parameters:
        address (str): sets the valid network address of the Server/Client.
        port (str): sets the valid Network Port of the Server/Client.
        protocol (str): sets the valid messaging protocol between Server/Client.
        pattern (int): sets the supported messaging pattern(flow of communication) between Server/Client
        receive_mode (bool): select the Netgear's Mode of operation.
        logging (bool): enables/disables logging.
        options (dict): provides the flexibility to alter various NetGear internal properties.
    """
    # enable logging if specified
    self.__logging = logging if isinstance(logging, bool) else False

    # print current version
    logcurr_vidgear_ver(logging=self.__logging)

    # raise error(s) for critical Class imports
    import_dependency_safe(
        "zmq" if zmq is None else "", min_version="4.0", pkg_name="pyzmq"
    )
    import_dependency_safe(
        "simplejpeg" if simplejpeg is None else "", error="log", min_version="1.6.1"
    )

    # define valid messaging patterns => `0`: zmq.PAIR, `1`:(zmq.REQ,zmq.REP), and `1`:(zmq.SUB,zmq.PUB)
    valid_messaging_patterns = {
        0: (zmq.PAIR, zmq.PAIR),
        1: (zmq.REQ, zmq.REP),
        2: (zmq.PUB, zmq.SUB),
    }

    # Handle messaging pattern
    msg_pattern = None
    # check whether user-defined messaging pattern is valid
    if isinstance(pattern, int) and pattern in valid_messaging_patterns.keys():
        # assign value
        msg_pattern = valid_messaging_patterns[pattern]
    else:
        # otherwise default to 0:`zmq.PAIR`
        pattern = 0
        msg_pattern = valid_messaging_patterns[pattern]
        self.__logging and logger.warning(
            "Wrong pattern value, Defaulting to `zmq.PAIR`! Kindly refer Docs for more Information."
        )
    # assign pattern to global parameter for further use
    self.__pattern = pattern

    # Handle messaging protocol
    if protocol is None or not (protocol in ["tcp", "ipc"]):
        # else default to `tcp` protocol
        protocol = "tcp"
        # log it
        self.__logging and logger.warning(
            "Protocol is not supported or not provided. Defaulting to `tcp` protocol!"
        )

    # Handle connection params

    self.__msg_flag = 0  # handles connection flags
    self.__msg_copy = False  # handles whether to copy data
    self.__msg_track = False  # handles whether to track packets

    # Handle NetGear's internal exclusive modes and params

    # define Secure Mode
    self.__z_auth = None

    # define SSH Tunneling Mode
    self.__ssh_tunnel_mode = None  # handles ssh_tunneling mode state
    self.__ssh_tunnel_pwd = None
    self.__ssh_tunnel_keyfile = None
    self.__paramiko_present = False if paramiko is None else True

    # define Multi-Server mode
    self.__multiserver_mode = False  # handles multi-server mode state

    # define Multi-Client mode
    self.__multiclient_mode = False  # handles multi-client mode state

    # define Bidirectional mode
    self.__bi_mode = False  # handles Bidirectional mode state

    # define Secure mode
    valid_security_mech = {0: "Grasslands", 1: "StoneHouse", 2: "IronHouse"}
    self.__secure_mode = 0  # handles ZMQ security layer status
    auth_cert_dir = ""  # handles valid ZMQ certificates dir
    self.__auth_publickeys_dir = ""  # handles valid ZMQ public certificates dir
    self.__auth_secretkeys_dir = ""  # handles valid ZMQ private certificates dir
    overwrite_cert = False  # checks if certificates overwriting allowed
    custom_cert_location = ""  # handles custom ZMQ certificates path

    # define frame-compression handler
    self.__jpeg_compression = (
        True if not (simplejpeg is None) else False
    )  # enabled by default for all connections if simplejpeg is installed
    self.__jpeg_compression_quality = 90  # 90% quality
    self.__jpeg_compression_fastdct = True  # fastest DCT on by default
    self.__jpeg_compression_fastupsample = False  # fastupsample off by default
    self.__jpeg_compression_colorspace = "BGR"  # use BGR colorspace by default

    # defines frame compression on return data
    self.__ex_compression_params = None

    # define receiver return data handler
    self.__return_data = None

    # generate 8-digit random system id
    self.__id = "".join(
        secrets.choice(string.ascii_uppercase + string.digits) for i in range(8)
    )

    # define termination flag
    self.__terminate = False

    # additional settings for reliability
    if pattern < 2:
        # define zmq poller for reliable transmission
        self.__poll = zmq.Poller()
        # define max retries
        self.__max_retries = 3
        # request timeout
        self.__request_timeout = 4000  # 4 secs
    else:
        # subscriber timeout
        self.__subscriber_timeout = None

    # Handle user-defined options dictionary values
    # reformat dictionary
    options = {str(k).strip(): v for k, v in options.items()}

    # loop over dictionary key & values and assign to global variables if valid
    for key, value in options.items():
        # handle multi-server mode
        if key == "multiserver_mode" and isinstance(value, bool):
            # check if valid pattern assigned
            if pattern > 0:
                # activate Multi-server mode
                self.__multiserver_mode = value
            else:
                # otherwise disable it and raise error
                self.__multiserver_mode = False
                logger.critical("Multi-Server Mode is disabled!")
                raise ValueError(
                    "[NetGear:ERROR] :: `{}` pattern is not valid when Multi-Server Mode is enabled. Kindly refer Docs for more Information.".format(
                        pattern
                    )
                )

        # handle multi-client mode
        elif key == "multiclient_mode" and isinstance(value, bool):
            # check if valid pattern assigned
            if pattern > 0:
                # activate Multi-client mode
                self.__multiclient_mode = value
            else:
                # otherwise disable it and raise error
                self.__multiclient_mode = False
                logger.critical("Multi-Client Mode is disabled!")
                raise ValueError(
                    "[NetGear:ERROR] :: `{}` pattern is not valid when Multi-Client Mode is enabled. Kindly refer Docs for more Information.".format(
                        pattern
                    )
                )

        # handle bidirectional mode
        elif key == "bidirectional_mode" and isinstance(value, bool):
            # check if pattern is valid
            if pattern < 2:
                # activate Bidirectional mode if specified
                self.__bi_mode = value
            else:
                # otherwise disable it and raise error
                self.__bi_mode = False
                logger.warning("Bidirectional data transmission is disabled!")
                raise ValueError(
                    "[NetGear:ERROR] :: `{}` pattern is not valid when Bidirectional Mode is enabled. Kindly refer Docs for more Information!".format(
                        pattern
                    )
                )

        # handle secure mode
        elif (
            key == "secure_mode"
            and isinstance(value, int)
            and (value in valid_security_mech)
        ):
            self.__secure_mode = value

        elif key == "custom_cert_location" and isinstance(value, str):
            # verify custom auth certificates path for secure mode
            custom_cert_location = os.path.abspath(value)
            assert os.path.isdir(
                custom_cert_location
            ), "[NetGear:ERROR] :: `custom_cert_location` value must be the path to a valid directory!"
            assert check_WriteAccess(
                custom_cert_location,
                is_windows=True if os.name == "nt" else False,
                logging=self.__logging,
            ), "[NetGear:ERROR] :: Permission Denied!, cannot write ZMQ authentication certificates to '{}' directory!".format(
                value
            )
        elif key == "overwrite_cert" and isinstance(value, bool):
            # enable/disable auth certificate overwriting in secure mode
            overwrite_cert = value

        # handle ssh-tunneling mode
        elif key == "ssh_tunnel_mode" and isinstance(value, str):
            # enable SSH Tunneling Mode
            self.__ssh_tunnel_mode = value.strip()
        elif key == "ssh_tunnel_pwd" and isinstance(value, str):
            # add valid SSH Tunneling password
            self.__ssh_tunnel_pwd = value
        elif key == "ssh_tunnel_keyfile" and isinstance(value, str):
            # add valid SSH Tunneling key-file
            self.__ssh_tunnel_keyfile = value if os.path.isfile(value) else None
            if self.__ssh_tunnel_keyfile is None:
                logger.warning(
                    "Discarded invalid or non-existential SSH Tunnel Key-file at {}!".format(
                        value
                    )
                )

        # handle jpeg compression
        elif (
            key == "jpeg_compression"
            and not (simplejpeg is None)
            and isinstance(value, (bool, str))
        ):
            if isinstance(value, str) and value.strip().upper() in [
                "RGB",
                "BGR",
                "RGBX",
                "BGRX",
                "XBGR",
                "XRGB",
                "GRAY",
                "RGBA",
                "BGRA",
                "ABGR",
                "ARGB",
                "CMYK",
            ]:
                # set encoding colorspace
                self.__jpeg_compression_colorspace = value.strip().upper()
                # enable frame-compression encoding value
                self.__jpeg_compression = True
            else:
                # enable frame-compression encoding value
                self.__jpeg_compression = value
        elif key == "jpeg_compression_quality" and isinstance(value, (int, float)):
            # set valid jpeg quality
            if value >= 10 and value <= 100:
                self.__jpeg_compression_quality = int(value)
            else:
                logger.warning("Skipped invalid `jpeg_compression_quality` value!")
        elif key == "jpeg_compression_fastdct" and isinstance(value, bool):
            # enable jpeg fastdct
            self.__jpeg_compression_fastdct = value
        elif key == "jpeg_compression_fastupsample" and isinstance(value, bool):
            # enable jpeg  fastupsample
            self.__jpeg_compression_fastupsample = value

        # assign maximum retries in synchronous patterns
        elif key == "max_retries" and isinstance(value, int) and pattern < 2:
            if value >= 0:
                self.__max_retries = value
            else:
                logger.warning("Invalid `max_retries` value skipped!")

        # assign request timeout in synchronous patterns
        elif key == "request_timeout" and isinstance(value, int) and pattern < 2:
            if value >= 4:
                self.__request_timeout = value * 1000  # covert to milliseconds
            else:
                logger.warning("Invalid `request_timeout` value skipped!")

        # assign subscriber timeout
        elif (
            key == "subscriber_timeout" and isinstance(value, int) and pattern == 2
        ):
            if value > 0:
                self.__subscriber_timeout = value * 1000  # covert to milliseconds
            else:
                logger.warning("Invalid `request_timeout` value skipped!")

        # handle ZMQ flags
        elif key == "flag" and isinstance(value, int):
            self.__msg_flag = value
            self.__msg_flag and logger.warning(
                "The flag optional value is set to `1` (NOBLOCK) for this run. This might cause NetGear to not terminate gracefully."
            )
        elif key == "copy" and isinstance(value, bool):
            self.__msg_copy = value
        elif key == "track" and isinstance(value, bool):
            self.__msg_track = value
            self.__msg_copy and self.__msg_track and logger.info(
                "The `track` optional value will be ignored for this run because `copy=True` is also defined."
            )
        else:
            pass

    # Handle ssh tunneling if enabled
    if not (self.__ssh_tunnel_mode is None):
        # SSH Tunnel Mode only available for server mode
        if receive_mode:
            logger.error("SSH Tunneling cannot be enabled for Client-end!")
        else:
            # check if SSH tunneling possible
            ssh_address = self.__ssh_tunnel_mode
            ssh_address, ssh_port = (
                ssh_address.split(":")
                if ":" in ssh_address
                else [ssh_address, "22"]
            )  # default to port 22
            if "47" in ssh_port:
                self.__ssh_tunnel_mode = self.__ssh_tunnel_mode.replace(
                    ":47", ""
                )  # port-47 is reserved for testing
            else:
                # extract ip for validation
                ssh_user, ssh_ip = (
                    ssh_address.split("@")
                    if "@" in ssh_address
                    else ["", ssh_address]
                )
                # validate ip specified port
                assert check_open_port(
                    ssh_ip, port=int(ssh_port)
                ), "[NetGear:ERROR] :: Host `{}` is not available for SSH Tunneling at port-{}!".format(
                    ssh_address, ssh_port
                )

    # Handle multiple exclusive modes if enabled
    if self.__multiclient_mode and self.__multiserver_mode:
        raise ValueError(
            "[NetGear:ERROR] :: Multi-Client and Multi-Server Mode cannot be enabled simultaneously!"
        )
    elif self.__multiserver_mode or self.__multiclient_mode:
        # check if Bidirectional Mode also enabled
        if self.__bi_mode:
            # log it
            self.__logging and logger.debug(
                "Bidirectional Data Transmission is also enabled for this connection!"
            )
        # check if SSH Tunneling Mode also enabled
        if self.__ssh_tunnel_mode:
            # raise error
            raise ValueError(
                "[NetGear:ERROR] :: SSH Tunneling and {} Mode cannot be enabled simultaneously. Kindly refer docs!".format(
                    "Multi-Server" if self.__multiserver_mode else "Multi-Client"
                )
            )
    elif self.__bi_mode:
        # log Bidirectional mode activation
        self.__logging and logger.debug(
            "Bidirectional Data Transmission is enabled for this connection!"
        )
    elif self.__ssh_tunnel_mode:
        # log Bidirectional mode activation
        self.__logging and logger.debug(
            "SSH Tunneling is enabled for host:`{}` with `{}` back-end.".format(
                self.__ssh_tunnel_mode,
                "paramiko" if self.__paramiko_present else "pexpect",
            )
        )

    # On Windows, NetGear requires the ``WindowsSelectorEventLoop`` but Python 3.8 and above,
    # defaults to an ``ProactorEventLoop`` loop that is not compatible with it. Thereby,
    # we had to set it manually.
    platform.system() == "Windows" and asyncio.set_event_loop_policy(
        asyncio.WindowsSelectorEventLoopPolicy()
    )

    # define ZMQ messaging context instance
    self.__msg_context = zmq.Context.instance()

    # initialize and assign receive mode to global variable
    self.__receive_mode = receive_mode

    # Handle Secure mode
    if self.__secure_mode > 0:
        # activate and log if overwriting is enabled
        if receive_mode:
            overwrite_cert = False
            overwrite_cert and logger.warning(
                "Overwriting ZMQ Authentication certificates is disabled for Client's end!"
            )
        else:
            overwrite_cert and self.__logging and logger.info(
                "Overwriting ZMQ Authentication certificates over previous ones!"
            )

        # Validate certificate generation paths
        # Start threaded authenticator for this context
        try:
            # check if custom certificates path is specified
            if custom_cert_location:
                (
                    auth_cert_dir,
                    self.__auth_secretkeys_dir,
                    self.__auth_publickeys_dir,
                ) = generate_auth_certificates(
                    custom_cert_location, overwrite=overwrite_cert, logging=logging
                )
            else:
                # otherwise auto-generate suitable path
                (
                    auth_cert_dir,
                    self.__auth_secretkeys_dir,
                    self.__auth_publickeys_dir,
                ) = generate_auth_certificates(
                    os.path.join(expanduser("~"), ".vidgear"),
                    overwrite=overwrite_cert,
                    logging=logging,
                )
            # log it
            self.__logging and logger.debug(
                "`{}` is the default location for storing ZMQ authentication certificates/keys.".format(
                    auth_cert_dir
                )
            )

            # start an authenticator for this context
            self.__z_auth = ThreadAuthenticator(self.__msg_context)
            self.__z_auth.start()
            self.__z_auth.allow(str(address))  # allow current address

            # check if `IronHouse` is activated
            if self.__secure_mode == 2:
                # tell authenticator to use the certificate from given valid dir
                self.__z_auth.configure_curve(
                    domain="*", location=self.__auth_publickeys_dir
                )
            else:
                # otherwise tell the authenticator how to handle the CURVE requests, if `StoneHouse` is activated
                self.__z_auth.configure_curve(
                    domain="*", location=auth.CURVE_ALLOW_ANY
                )
        except zmq.ZMQError as e:
            if "Address in use" in str(e):
                logger.info("ZMQ Authenticator already running.")
            else:
                # catch if any error occurred and disable Secure mode
                logger.exception(str(e))
                self.__secure_mode = 0
                logger.error(
                    "ZMQ Security Mechanism is disabled for this connection due to errors!"
                )

    # check whether `receive_mode` is enabled
    if self.__receive_mode:
        # define connection address
        address = "*" if address is None else address

        # check if multiserver_mode is enabled
        if self.__multiserver_mode:
            # check if unique server port address list/tuple is assigned or not in multiserver_mode
            if port is None or not isinstance(port, (tuple, list)):
                # raise error if not
                raise ValueError(
                    "[NetGear:ERROR] :: Incorrect port value! Kindly provide a list/tuple of Server ports while Multi-Server mode is enabled. For more information refer VidGear docs."
                )
            else:
                # otherwise log it
                logger.debug(
                    "Enabling Multi-Server Mode at PORTS: {}!".format(port)
                )
            # create port address buffer for keeping track of connected client's port(s)
            self.__port_buffer = []
        # check if multiclient_mode is enabled
        elif self.__multiclient_mode:
            # check if unique server port address is assigned or not in multiclient_mode
            if port is None:
                # raise error if not
                raise ValueError(
                    "[NetGear:ERROR] :: Kindly provide a unique & valid port value at Client-end. For more information refer VidGear docs."
                )
            else:
                # otherwise log it
                logger.debug(
                    "Enabling Multi-Client Mode at PORT: {} on this device!".format(
                        port
                    )
                )
            # assign value to global variable
            self.__port = port
        else:
            # otherwise assign local port address if None
            port = "5555" if port is None else port

        try:
            # define thread-safe messaging socket
            self.__msg_socket = self.__msg_context.socket(msg_pattern[1])

            # define pub-sub flag
            self.__pattern == 2 and self.__msg_socket.set_hwm(1)

            # enable specified secure mode for the socket
            if self.__secure_mode > 0:
                # load server key
                server_secret_file = os.path.join(
                    self.__auth_secretkeys_dir, "server.key_secret"
                )
                server_public, server_secret = auth.load_certificate(
                    server_secret_file
                )
                # load  all CURVE keys
                self.__msg_socket.curve_secretkey = server_secret
                self.__msg_socket.curve_publickey = server_public
                # enable CURVE connection for this socket
                self.__msg_socket.curve_server = True

            # define exclusive socket options for `patterns=2`
            if self.__pattern == 2:
                self.__msg_socket.setsockopt_string(zmq.SUBSCRIBE, "")
                self.__subscriber_timeout and self.__msg_socket.setsockopt(
                    zmq.RCVTIMEO, self.__subscriber_timeout
                )
                self.__subscriber_timeout and self.__msg_socket.setsockopt(
                    zmq.LINGER, 0
                )

            # if multiserver_mode is enabled, then assign port addresses to zmq socket
            if self.__multiserver_mode:
                # bind socket to given server protocol, address and ports
                for pt in port:
                    self.__msg_socket.bind(
                        protocol + "://" + str(address) + ":" + str(pt)
                    )
            else:
                # bind socket to given protocol, address and port normally
                self.__msg_socket.bind(
                    protocol + "://" + str(address) + ":" + str(port)
                )

            # additional settings
            if pattern < 2:
                if self.__multiserver_mode:
                    self.__connection_address = []
                    for pt in port:
                        self.__connection_address.append(
                            protocol + "://" + str(address) + ":" + str(pt)
                        )
                else:
                    self.__connection_address = (
                        protocol + "://" + str(address) + ":" + str(port)
                    )
                self.__msg_pattern = msg_pattern[1]
                self.__poll.register(self.__msg_socket, zmq.POLLIN)
                self.__logging and logger.debug(
                    "Reliable transmission is enabled for this pattern with max-retries: {} and timeout: {} secs.".format(
                        self.__max_retries, self.__request_timeout / 1000
                    )
                )
            else:
                self.__logging and self.__subscriber_timeout and logger.debug(
                    "Timeout: {} secs is enabled for this system.".format(
                        self.__subscriber_timeout / 1000
                    )
                )

        except Exception as e:
            # otherwise log and raise error
            logger.exception(str(e))
            # Handle Secure Mode
            self.__secure_mode and logger.critical(
                "Failed to activate Secure Mode: `{}` for this connection!".format(
                    valid_security_mech[self.__secure_mode]
                )
            )
            # raise errors for exclusive modes
            if self.__multiserver_mode or self.__multiclient_mode:
                raise RuntimeError(
                    "[NetGear:ERROR] :: Receive Mode failed to activate {} Mode at address: {} with pattern: {}! Kindly recheck all parameters.".format(
                        (
                            "Multi-Server"
                            if self.__multiserver_mode
                            else "Multi-Client"
                        ),
                        (protocol + "://" + str(address) + ":" + str(port)),
                        pattern,
                    )
                )
            else:
                self.__bi_mode and logger.critical(
                    "Failed to activate Bidirectional Mode for this connection!"
                )
                raise RuntimeError(
                    "[NetGear:ERROR] :: Receive Mode failed to bind address: {} and pattern: {}! Kindly recheck all parameters.".format(
                        (protocol + "://" + str(address) + ":" + str(port)), pattern
                    )
                )

        # Handle threaded queue mode
        self.__logging and logger.debug(
            "Threaded Queue Mode is enabled by default for this connection."
        )

        # define deque and assign it to global var
        self.__queue = deque(maxlen=96)  # max len 96 to check overflow

        # initialize and start threaded recv_handler
        self.__thread = Thread(target=self.__recv_handler, name="NetGear", args=())
        self.__thread.daemon = True
        self.__thread.start()

        if self.__logging:
            # finally log progress
            logger.debug(
                "Successfully Binded to address: {} with pattern: {}.".format(
                    (protocol + "://" + str(address) + ":" + str(port)), pattern
                )
            )
            self.__jpeg_compression and logger.debug(
                "JPEG Frame-Compression is activated for this connection with Colorspace:`{}`, Quality:`{}`%, Fastdct:`{}`, and Fastupsample:`{}`.".format(
                    self.__jpeg_compression_colorspace,
                    self.__jpeg_compression_quality,
                    ("enabled" if self.__jpeg_compression_fastdct else "disabled"),
                    (
                        "enabled"
                        if self.__jpeg_compression_fastupsample
                        else "disabled"
                    ),
                )
            )
            self.__secure_mode and logger.debug(
                "Successfully enabled ZMQ Security Mechanism: `{}` for this connection.".format(
                    valid_security_mech[self.__secure_mode]
                )
            )
            logger.debug("Multi-threaded Receive Mode is successfully enabled.")
            logger.debug("Unique System ID is {}.".format(self.__id))
            logger.debug("Receive Mode is now activated.")

    else:
        # otherwise default to `Send Mode`
        # define connection address
        address = "localhost" if address is None else address

        # check if multiserver_mode is enabled
        if self.__multiserver_mode:
            # check if unique server port address is assigned or not in multiserver_mode
            if port is None:
                # raise error if not
                raise ValueError(
                    "[NetGear:ERROR] :: Kindly provide a unique & valid port value at Server-end. For more information refer VidGear docs."
                )
            else:
                # otherwise log it
                logger.debug(
                    "Enabling Multi-Server Mode at PORT: {} on this device!".format(
                        port
                    )
                )
            # assign value to global variable
            self.__port = port
        # check if multiclient_mode is enabled
        elif self.__multiclient_mode:
            # check if unique client port address list/tuple is assigned or not in multiclient_mode
            if port is None or not isinstance(port, (tuple, list)):
                # raise error if not
                raise ValueError(
                    "[NetGear:ERROR] :: Incorrect port value! Kindly provide a list/tuple of Client ports while Multi-Client mode is enabled. For more information refer VidGear docs."
                )
            else:
                # otherwise log it
                logger.debug(
                    "Enabling Multi-Client Mode at PORTS: {}!".format(port)
                )
            # create port address buffer for keeping track of connected client ports
            self.__port_buffer = []
        else:
            # otherwise assign local port address if None
            port = "5555" if port is None else port

        try:
            # define thread-safe messaging socket
            self.__msg_socket = self.__msg_context.socket(msg_pattern[0])

            # if req/rep pattern, define additional flags
            if self.__pattern == 1:
                self.__msg_socket.REQ_RELAXED = True
                self.__msg_socket.REQ_CORRELATE = True

            # if pub/sub pattern, define additional optimizer
            if self.__pattern == 2:
                self.__msg_socket.set_hwm(1)

            # enable specified secure mode for the socket
            if self.__secure_mode > 0:
                # load client key
                client_secret_file = os.path.join(
                    self.__auth_secretkeys_dir, "client.key_secret"
                )
                client_public, client_secret = auth.load_certificate(
                    client_secret_file
                )
                # load  all CURVE keys
                self.__msg_socket.curve_secretkey = client_secret
                self.__msg_socket.curve_publickey = client_public
                # load server key
                server_public_file = os.path.join(
                    self.__auth_publickeys_dir, "server.key"
                )
                server_public, _ = auth.load_certificate(server_public_file)
                # inject public key to make a CURVE connection.
                self.__msg_socket.curve_serverkey = server_public

            # check if multi-client_mode is enabled
            if self.__multiclient_mode:
                # bind socket to given server protocol, address and ports
                for pt in port:
                    self.__msg_socket.connect(
                        protocol + "://" + str(address) + ":" + str(pt)
                    )
            else:
                # handle SSH tunneling if enabled
                if self.__ssh_tunnel_mode:
                    # establish tunnel connection
                    ssh.tunnel_connection(
                        self.__msg_socket,
                        protocol + "://" + str(address) + ":" + str(port),
                        self.__ssh_tunnel_mode,
                        keyfile=self.__ssh_tunnel_keyfile,
                        password=self.__ssh_tunnel_pwd,
                        paramiko=self.__paramiko_present,
                    )
                else:
                    # connect socket to given protocol, address and port
                    self.__msg_socket.connect(
                        protocol + "://" + str(address) + ":" + str(port)
                    )

            # additional settings
            if pattern < 2:
                if self.__multiclient_mode:
                    self.__connection_address = []
                    for pt in port:
                        self.__connection_address.append(
                            protocol + "://" + str(address) + ":" + str(pt)
                        )
                else:
                    self.__connection_address = (
                        protocol + "://" + str(address) + ":" + str(port)
                    )
                self.__msg_pattern = msg_pattern[0]
                self.__poll.register(self.__msg_socket, zmq.POLLIN)

                self.__logging and logger.debug(
                    "Reliable transmission is enabled for this pattern with max-retries: {} and timeout: {} secs.".format(
                        self.__max_retries, self.__request_timeout / 1000
                    )
                )

        except Exception as e:
            # otherwise log and raise error
            logger.exception(str(e))
            # Handle Secure Mode
            self.__secure_mode and logger.critical(
                "Failed to activate Secure Mode: `{}` for this connection!".format(
                    valid_security_mech[self.__secure_mode]
                )
            )
            # raise errors for exclusive modes
            if self.__multiserver_mode or self.__multiclient_mode:
                raise RuntimeError(
                    "[NetGear:ERROR] :: Send Mode failed to activate {} Mode at address: {} with pattern: {}! Kindly recheck all parameters.".format(
                        (
                            "Multi-Server"
                            if self.__multiserver_mode
                            else "Multi-Client"
                        ),
                        (protocol + "://" + str(address) + ":" + str(port)),
                        pattern,
                    )
                )
            else:
                self.__bi_mode and logger.critical(
                    "Failed to activate Bidirectional Mode for this connection!"
                )
                self.__ssh_tunnel_mode and logger.critical(
                    "Failed to initiate SSH Tunneling Mode for this server with `{}` back-end!".format(
                        "paramiko" if self.__paramiko_present else "pexpect"
                    )
                )
                raise RuntimeError(
                    "[NetGear:ERROR] :: Send Mode failed to connect address: {} and pattern: {}! Kindly recheck all parameters.".format(
                        (protocol + "://" + str(address) + ":" + str(port)), pattern
                    )
                )

        if self.__logging:
            # finally log progress
            logger.debug(
                "Successfully connected to address: {} with pattern: {}.".format(
                    (protocol + "://" + str(address) + ":" + str(port)), pattern
                )
            )
            self.__jpeg_compression and logger.debug(
                "JPEG Frame-Compression is activated for this connection with Colorspace:`{}`, Quality:`{}`%, Fastdct:`{}`, and Fastupsample:`{}`.".format(
                    self.__jpeg_compression_colorspace,
                    self.__jpeg_compression_quality,
                    ("enabled" if self.__jpeg_compression_fastdct else "disabled"),
                    (
                        "enabled"
                        if self.__jpeg_compression_fastupsample
                        else "disabled"
                    ),
                )
            )
            self.__secure_mode and logger.debug(
                "Enabled ZMQ Security Mechanism: `{}` for this connection.".format(
                    valid_security_mech[self.__secure_mode]
                )
            )
            logger.debug("Unique System ID is {}.".format(self.__id))
            logger.debug(
                "Send Mode is successfully activated and ready to send data."
            )

close(kill=False) ¶

Safely terminates the threads, and NetGear resources.

Parameters:

Name Type Description Default
kill bool

Kills ZMQ context instead of graceful exiting in receive mode.

False
Source code in vidgear/gears/netgear.py
def close(self, kill: bool = False) -> None:
    """
    Safely terminates the threads, and NetGear resources.

    Parameters:
        kill (bool): Kills ZMQ context instead of graceful exiting in receive mode.
    """
    # log it
    self.__logging and logger.debug(
        "Terminating various {} Processes.".format(
            "Receive Mode" if self.__receive_mode else "Send Mode"
        )
    )
    #  whether `receive_mode` is enabled or not
    if self.__receive_mode:
        # check whether queue mode is empty
        if not (self.__queue is None) and self.__queue:
            self.__queue.clear()
        # call immediate termination
        self.__terminate = True
        # properly close the socket
        self.__logging and logger.debug("Terminating. Please wait...")
        # Handle Secure Mode Thread
        if self.__z_auth:
            self.__logging and logger.debug("Terminating Authenticator Thread.")
            self.__z_auth.stop()
            while self.__z_auth.is_alive():
                pass
        # wait until stream resources are released
        # (producer thread might be still grabbing frame)
        if self.__thread is not None:
            self.__logging and logger.debug("Terminating Main Thread.")
            # properly handle thread exit
            if self.__thread.is_alive() and kill:
                # force close if still alive
                logger.warning("Thread still running...Killing it forcefully!")
                self.__msg_context.destroy()
                self.__thread.join()
            else:
                self.__msg_socket.close(linger=0)
                self.__thread.join()
            self.__thread = None
        self.__logging and logger.debug("Terminated Successfully!")
    else:
        # indicate that process should be terminated
        self.__terminate = True
        # log if kill enabled
        kill and logger.warning(
            "`kill` parmeter is only available in the receive mode."
        )
        # Handle Secure Mode Thread
        if self.__z_auth:
            self.__logging and logger.debug("Terminating Authenticator Thread.")
            self.__z_auth.stop()
            while self.__z_auth.is_alive():
                pass
        # check if all attempts of reconnecting failed, then skip to closure
        if (self.__pattern < 2 and not self.__max_retries) or (
            self.__multiclient_mode and not self.__port_buffer
        ):
            try:
                # properly close the socket
                self.__msg_socket.setsockopt(zmq.LINGER, 0)
                self.__msg_socket.close()
            except ZMQError:
                pass
            finally:
                # exit
                return

        if self.__multiserver_mode:
            # check if multiserver_mode
            # send termination flag to client with its unique port
            term_dict = dict(terminate_flag=True, port=self.__port)
        else:
            # otherwise send termination flag to client
            term_dict = dict(terminate_flag=True)

        try:
            if self.__multiclient_mode:
                for _ in self.__port_buffer:
                    self.__msg_socket.send_json(term_dict)
            else:
                self.__msg_socket.send_json(term_dict)

            # check for confirmation if available within 1/5 timeout
            if self.__pattern < 2:
                self.__logging and logger.debug("Terminating. Please wait...")
                if self.__msg_socket.poll(self.__request_timeout // 5, zmq.POLLIN):
                    self.__msg_socket.recv()
        except Exception as e:
            if not isinstance(e, ZMQError):
                logger.exception(str(e))
        finally:
            # properly close the socket
            self.__msg_socket.setsockopt(zmq.LINGER, 0)
            self.__msg_socket.close()
            self.__logging and logger.debug("Terminated Successfully!")

recv(return_data=None) ¶

A Receiver end method, that extracts received frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full.

Parameters:

Name Type Description Default
return_data any

inputs return data (of any datatype), for sending back to Server.

None

Returns: A n-dimensional numpy array.

Source code in vidgear/gears/netgear.py
def recv(self, return_data=None) -> Optional[NDArray]:
    """
    A Receiver end method, that extracts received frames synchronously from monitored deque, while maintaining a
    fixed-length frame buffer in the memory, and blocks the thread if the deque is full.

    Parameters:
        return_data (any): inputs return data _(of any datatype)_, for sending back to Server.

    **Returns:** A n-dimensional numpy array.
    """
    # check whether `receive mode` is activated
    if not (self.__receive_mode):
        # raise value error and exit
        self.__terminate = True
        raise ValueError(
            "[NetGear:ERROR] :: `recv()` function cannot be used while receive_mode is disabled. Kindly refer vidgear docs!"
        )

    # handle Bidirectional return data
    if (self.__bi_mode or self.__multiclient_mode) and not (return_data is None):
        self.__return_data = return_data

    # check whether or not termination flag is enabled
    while not self.__terminate:
        try:
            # check if queue is empty
            if len(self.__queue) > 0:
                return self.__queue.popleft()
            else:
                time.sleep(0.00001)
                continue
        except KeyboardInterrupt:
            self.__terminate = True
            break
    # otherwise return NoneType
    return None

send(frame, message=None) ¶

A Server end method, that sends the data and frames over the network to Client(s).

Parameters:

Name Type Description Default
frame ndarray

inputs numpy array(frame).

required
message any

input for sending additional data (of any datatype except numpy.ndarray) to Client(s).

None

Returns: Data (of any datatype) in selected exclusive modes, otherwise None-type.

Source code in vidgear/gears/netgear.py
def send(self, frame: NDArray, message: Any = None) -> Optional[Any]:
    """
    A Server end method, that sends the data and frames over the network to Client(s).

    Parameters:
        frame (numpy.ndarray): inputs numpy array(frame).
        message (any): input for sending additional data _(of any datatype except `numpy.ndarray`)_ to Client(s).

    **Returns:** Data _(of any datatype)_ in selected exclusive modes, otherwise None-type.

    """
    # check whether `receive_mode` is disabled
    if self.__receive_mode:
        # raise value error and exit
        self.__terminate = True
        raise ValueError(
            "[NetGear:ERROR] :: `send()` function cannot be used while receive_mode is enabled. Kindly refer vidgear docs!"
        )

    if not (message is None) and isinstance(message, np.ndarray):
        logger.warning(
            "Skipped unsupported `message` of datatype: {}!".format(
                type(message).__name__
            )
        )
        message = None

    # define exit_flag and assign value
    exit_flag = True if (frame is None or self.__terminate) else False

    # check whether exit_flag is False
    if not (exit_flag) and not (frame.flags["C_CONTIGUOUS"]):
        # check whether the incoming frame is contiguous
        frame = np.ascontiguousarray(frame, dtype=frame.dtype)

    # handle JPEG compression encoding
    if self.__jpeg_compression:
        if self.__jpeg_compression_colorspace == "GRAY":
            if frame.ndim == 2:
                # patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11
                frame = np.expand_dims(frame, axis=2)
            frame = simplejpeg.encode_jpeg(
                frame,
                quality=self.__jpeg_compression_quality,
                colorspace=self.__jpeg_compression_colorspace,
                fastdct=self.__jpeg_compression_fastdct,
            )
        else:
            frame = simplejpeg.encode_jpeg(
                frame,
                quality=self.__jpeg_compression_quality,
                colorspace=self.__jpeg_compression_colorspace,
                colorsubsampling="422",
                fastdct=self.__jpeg_compression_fastdct,
            )

    # check if multiserver_mode is activated and assign values with unique port
    msg_dict = dict(port=self.__port) if self.__multiserver_mode else dict()

    # prepare the exclusive json dict
    msg_dict.update(
        dict(
            terminate_flag=exit_flag,
            compression=(
                {
                    "dct": self.__jpeg_compression_fastdct,
                    "ups": self.__jpeg_compression_fastupsample,
                    "colorspace": self.__jpeg_compression_colorspace,
                }
                if self.__jpeg_compression
                else False
            ),
            message=message,
            pattern=str(self.__pattern),
            dtype=str(frame.dtype) if not (self.__jpeg_compression) else "",
            shape=frame.shape if not (self.__jpeg_compression) else "",
        )
    )

    # send the json dict
    self.__msg_socket.send_json(msg_dict, self.__msg_flag | zmq.SNDMORE)
    # send the frame array with correct flags
    self.__msg_socket.send(
        frame, flags=self.__msg_flag, copy=self.__msg_copy, track=self.__msg_track
    )

    # check if synchronous patterns, then wait for confirmation
    if self.__pattern < 2:
        # check if Bidirectional data transmission is enabled
        if self.__bi_mode or self.__multiclient_mode:
            # handles return data
            recvd_data = None

            socks = dict(self.__poll.poll(self.__request_timeout))
            if socks.get(self.__msg_socket) == zmq.POLLIN:
                # handle return data
                recv_json = self.__msg_socket.recv_json(flags=self.__msg_flag)
            else:
                logger.critical("No response from Client, Reconnecting again...")
                # Socket is confused. Close and remove it.
                self.__msg_socket.setsockopt(zmq.LINGER, 0)
                self.__msg_socket.close()
                self.__poll.unregister(self.__msg_socket)
                self.__max_retries -= 1

                if not (self.__max_retries):
                    if self.__multiclient_mode:
                        logger.error(
                            "All Clients failed to respond on multiple attempts."
                        )
                    else:
                        logger.error(
                            "Client failed to respond on multiple attempts."
                        )
                    self.__terminate = True
                    raise RuntimeError(
                        "[NetGear:ERROR] :: Client(s) seems to be offline, Abandoning."
                    )

                # Create new connection
                self.__msg_socket = self.__msg_context.socket(self.__msg_pattern)
                if isinstance(self.__connection_address, list):
                    for _connection in self.__connection_address:
                        self.__msg_socket.connect(_connection)
                else:
                    # handle SSH tunneling if enabled
                    if self.__ssh_tunnel_mode:
                        # establish tunnel connection
                        ssh.tunnel_connection(
                            self.__msg_socket,
                            self.__connection_address,
                            self.__ssh_tunnel_mode,
                            keyfile=self.__ssh_tunnel_keyfile,
                            password=self.__ssh_tunnel_pwd,
                            paramiko=self.__paramiko_present,
                        )
                    else:
                        # connect normally
                        self.__msg_socket.connect(self.__connection_address)
                self.__poll.register(self.__msg_socket, zmq.POLLIN)
                # return None for mean-time
                return None

            # save the unique port addresses
            if (
                self.__multiclient_mode
                and not recv_json["port"] in self.__port_buffer
            ):
                self.__port_buffer.append(recv_json["port"])

            if recv_json["return_type"] == "ndarray":
                recv_array = self.__msg_socket.recv(
                    flags=self.__msg_flag,
                    copy=self.__msg_copy,
                    track=self.__msg_track,
                )
                # check if encoding was enabled
                if recv_json["compression"]:
                    # decode JPEG frame
                    recvd_data = simplejpeg.decode_jpeg(
                        recv_array,
                        colorspace=recv_json["compression"]["colorspace"],
                        fastdct=self.__jpeg_compression_fastdct
                        or recv_json["compression"]["dct"],
                        fastupsample=self.__jpeg_compression_fastupsample
                        or recv_json["compression"]["ups"],
                    )
                    # check if valid frame returned
                    if recvd_data is None:
                        self.__terminate = True
                        # otherwise raise error and exit
                        raise RuntimeError(
                            "[NetGear:ERROR] :: Received compressed frame `{}` decoding failed with flag: {}.".format(
                                recv_json["compression"],
                                self.__ex_compression_params,
                            )
                        )

                    if (
                        recv_json["compression"]["colorspace"] == "GRAY"
                        and recvd_data.ndim == 3
                    ):
                        # patch for https://gitlab.com/jfolz/simplejpeg/-/issues/11
                        recvd_data = np.squeeze(recvd_data, axis=2)
                else:
                    recvd_data = np.frombuffer(
                        recv_array, dtype=recv_json["array_dtype"]
                    ).reshape(recv_json["array_shape"])
            else:
                recvd_data = recv_json["data"]

            return (
                (recv_json["port"], recvd_data)
                if self.__multiclient_mode
                else recvd_data
            )
        else:
            # otherwise log normally
            socks = dict(self.__poll.poll(self.__request_timeout))
            if socks.get(self.__msg_socket) == zmq.POLLIN:
                recv_confirmation = self.__msg_socket.recv()
            else:
                logger.critical("No response from Client, Reconnecting again...")
                # Socket is confused. Close and remove it.
                self.__msg_socket.setsockopt(zmq.LINGER, 0)
                self.__msg_socket.close()
                self.__poll.unregister(self.__msg_socket)
                self.__max_retries -= 1

                if not (self.__max_retries):
                    logger.error("Client failed to respond on repeated attempts.")
                    self.__terminate = True
                    raise RuntimeError(
                        "[NetGear:ERROR] :: Client seems to be offline, Abandoning!"
                    )

                # Create new connection
                self.__msg_socket = self.__msg_context.socket(self.__msg_pattern)
                # handle SSH tunneling if enabled
                if self.__ssh_tunnel_mode:
                    # establish tunnel connection
                    ssh.tunnel_connection(
                        self.__msg_socket,
                        self.__connection_address,
                        self.__ssh_tunnel_mode,
                        keyfile=self.__ssh_tunnel_keyfile,
                        password=self.__ssh_tunnel_pwd,
                        paramiko=self.__paramiko_present,
                    )
                else:
                    # connect normally
                    self.__msg_socket.connect(self.__connection_address)
                self.__poll.register(self.__msg_socket, zmq.POLLIN)
                return None

            # log confirmation
            self.__logging and logger.debug(recv_confirmation)

 

Was this page helpful?