CamGear Examples⚓
Synchronizing Two Sources in CamGear⚓
In this example both streams and corresponding frames will be processed synchronously i.e. with no delay:
Using same source with more than one instances of CamGear can lead to Global Interpreter Lock (GIL) that degrades performance even when it is not a bottleneck.
# import required libraries
from vidgear.gears import CamGear
import cv2
import time
# define and start the stream on first source ( For e.g #0 index device)
stream1 = CamGear(source=0, logging=True).start()
# define and start the stream on second source ( For e.g #1 index device)
stream2 = CamGear(source=1, logging=True).start()
# infinite loop
while True:
frameA = stream1.read()
# read frames from stream1
frameB = stream2.read()
# read frames from stream2
# check if any of two frame is None
if frameA is None or frameB is None:
#if True break the infinite loop
break
# do something with both frameA and frameB here
cv2.imshow("Output Frame1", frameA)
cv2.imshow("Output Frame2", frameB)
# Show output window of stream1 and stream 2 separately
key = cv2.waitKey(1) & 0xFF
# check for 'q' key-press
if key == ord("q"):
#if 'q' key-pressed break out
break
if key == ord("w"):
#if 'w' key-pressed save both frameA and frameB at same time
cv2.imwrite("Image-1.jpg", frameA)
cv2.imwrite("Image-2.jpg", frameB)
#break #uncomment this line to break out after taking images
cv2.destroyAllWindows()
# close output window
# safely close both video streams
stream1.stop()
stream2.stop()
Using variable yt_dlp
parameters in CamGear⚓
CamGear provides exclusive attributes STREAM_RESOLUTION
(for specifying stream resolution) & STREAM_PARAMS
(for specifying underlying API(e.g. yt_dlp
) parameters) with its options
dictionary parameter.
The complete usage example is as follows:
More information on STREAM_RESOLUTION
& STREAM_PARAMS
attributes can be found here ➶
# import required libraries
from vidgear.gears import CamGear
import cv2
# specify attributes
options = {"STREAM_RESOLUTION": "720p", "STREAM_PARAMS": {"nocheckcertificate": True}}
# Add YouTube Video URL as input source (for e.g https://youtu.be/bvetuLwJIkA)
# and enable Stream Mode (`stream_mode = True`)
stream = CamGear(
source="https://youtu.be/bvetuLwJIkA", stream_mode=True, logging=True, **options
).start()
# loop over
while True:
# read frames from stream
frame = stream.read()
# check for frame if Nonetype
if frame is None:
break
# {do something with the frame here}
# Show output window
cv2.imshow("Output", frame)
# check for 'q' key if pressed
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
# close output window
cv2.destroyAllWindows()
# safely close video stream
stream.stop()
Using CamGear for capturing RTSP/RTMP URLs⚓
You can open any network stream (such as RTSP/RTMP) just by providing its URL directly to CamGear's source
parameter.
Here's a high-level wrapper code around CamGear API to enable auto-reconnection during capturing:
New in v0.2.2
This example was added in v0.2.2
.
Enforcing UDP stream
You can easily enforce UDP for RTSP streams inplace of default TCP, by putting following lines of code on the top of your existing code:
# import required libraries
import os
# enforce UDP
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;udp"
Finally, use backend
parameter value as backend=cv2.CAP_FFMPEG
in CamGear.
from vidgear.gears import CamGear
import cv2
import datetime
import time
class Reconnecting_CamGear:
def __init__(self, cam_address, reset_attempts=50, reset_delay=5):
self.cam_address = cam_address
self.reset_attempts = reset_attempts
self.reset_delay = reset_delay
self.source = CamGear(source=self.cam_address).start()
self.running = True
def read(self):
if self.source is None:
return None
if self.running and self.reset_attempts > 0:
frame = self.source.read()
if frame is None:
self.source.stop()
self.reset_attempts -= 1
print(
"Re-connection Attempt-{} occured at time:{}".format(
str(self.reset_attempts),
datetime.datetime.now().strftime("%m-%d-%Y %I:%M:%S%p"),
)
)
time.sleep(self.reset_delay)
self.source = CamGear(source=self.cam_address).start()
# return previous frame
return self.frame
else:
self.frame = frame
return frame
else:
return None
def stop(self):
self.running = False
self.reset_attempts = 0
self.frame = None
if not self.source is None:
self.source.stop()
if __name__ == "__main__":
# open any valid video stream
stream = Reconnecting_CamGear(
cam_address="rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov",
reset_attempts=20,
reset_delay=5,
)
# loop over
while True:
# read frames from stream
frame = stream.read()
# check for frame if None-type
if frame is None:
break
# {do something with the frame here}
# Show output window
cv2.imshow("Output", frame)
# check for 'q' key if pressed
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
# close output window
cv2.destroyAllWindows()
# safely close video stream
stream.stop()