CamGear Examples⚓
Synchronizing Two Sources in CamGear⚓
In this example both streams and corresponding frames will be processed synchronously i.e. with no delay:
Using same source with more than one instances of CamGear can lead to Global Interpreter Lock (GIL) that degrades performance even when it is not a bottleneck.
# import required libraries
from vidgear.gears import CamGear
import cv2
import time
# define and start the stream on first source ( For e.g #0 index device)
stream1 = CamGear(source=0, logging=True).start()
# define and start the stream on second source ( For e.g #1 index device)
stream2 = CamGear(source=1, logging=True).start()
# infinite loop
while True:
frameA = stream1.read()
# read frames from stream1
frameB = stream2.read()
# read frames from stream2
# check if any of two frame is None
if frameA is None or frameB is None:
#if True break the infinite loop
break
# do something with both frameA and frameB here
cv2.imshow("Output Frame1", frameA)
cv2.imshow("Output Frame2", frameB)
# Show output window of stream1 and stream 2 separately
key = cv2.waitKey(1) & 0xFF
# check for 'q' key-press
if key == ord("q"):
#if 'q' key-pressed break out
break
if key == ord("w"):
#if 'w' key-pressed save both frameA and frameB at same time
cv2.imwrite("Image-1.jpg", frameA)
cv2.imwrite("Image-2.jpg", frameB)
#break #uncomment this line to break out after taking images
cv2.destroyAllWindows()
# close output window
# safely close both video streams
stream1.stop()
stream2.stop()
Using variable yt_dlp parameters in CamGear⚓
CamGear provides exclusive attributes STREAM_RESOLUTION (for specifying stream resolution) & STREAM_PARAMS (for specifying underlying API(e.g. yt_dlp) parameters) with its options dictionary parameter.
The complete usage example is as follows:
More information on STREAM_RESOLUTION & STREAM_PARAMS attributes can be found here ➶
# import required libraries
from vidgear.gears import CamGear
import cv2
# specify attributes
options = {"STREAM_RESOLUTION": "720p", "STREAM_PARAMS": {"nocheckcertificate": True}}
# Add YouTube Video URL as input source (for e.g https://youtu.be/bvetuLwJIkA)
# and enable Stream Mode (`stream_mode = True`)
stream = CamGear(
source="https://youtu.be/bvetuLwJIkA", stream_mode=True, logging=True, **options
).start()
# loop over
while True:
# read frames from stream
frame = stream.read()
# check for frame if Nonetype
if frame is None:
break
# {do something with the frame here}
# Show output window
cv2.imshow("Output", frame)
# check for 'q' key if pressed
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
# close output window
cv2.destroyAllWindows()
# safely close video stream
stream.stop()
Using CamGear for capturing RTSP/RTMP URLs⚓
You can open any network stream (such as RTSP/RTMP) just by providing its URL directly to CamGear's source parameter.
Here's a high-level wrapper code around CamGear API to enable auto-reconnection during capturing:
New in v0.2.2
This example was added in v0.2.2.
Enforcing UDP stream
You can easily enforce UDP for RTSP streams inplace of default TCP, by putting following lines of code on the top of your existing code:
# import required libraries
import os
# enforce UDP
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;udp"
Finally, use backend parameter value as backend=cv2.CAP_FFMPEG in CamGear.
from vidgear.gears import CamGear
import cv2
import datetime
import time
class Reconnecting_CamGear:
def __init__(self, cam_address, reset_attempts=50, reset_delay=5):
self.cam_address = cam_address
self.reset_attempts = reset_attempts
self.reset_delay = reset_delay
self.source = CamGear(source=self.cam_address).start()
self.running = True
def read(self):
if self.source is None:
return None
if self.running and self.reset_attempts > 0:
frame = self.source.read()
if frame is None:
self.source.stop()
self.reset_attempts -= 1
print(
"Re-connection Attempt-{} occured at time:{}".format(
str(self.reset_attempts),
datetime.datetime.now().strftime("%m-%d-%Y %I:%M:%S%p"),
)
)
time.sleep(self.reset_delay)
self.source = CamGear(source=self.cam_address).start()
# return previous frame
return self.frame
else:
self.frame = frame
return frame
else:
return None
def stop(self):
self.running = False
self.reset_attempts = 0
self.frame = None
if not self.source is None:
self.source.stop()
if __name__ == "__main__":
# open any valid video stream
stream = Reconnecting_CamGear(
cam_address="rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov",
reset_attempts=20,
reset_delay=5,
)
# loop over
while True:
# read frames from stream
frame = stream.read()
# check for frame if None-type
if frame is None:
break
# {do something with the frame here}
# Show output window
cv2.imshow("Output", frame)
# check for 'q' key if pressed
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
# close output window
cv2.destroyAllWindows()
# safely close video stream
stream.stop()