VideoGear Examples⚓
Using VideoGear with ROS(Robot Operating System)⚓
We will be using cv_bridge
to convert OpenCV frames to ROS image messages and vice-versa.
In this example, we'll create a node that convert OpenCV frames into ROS image messages, and then publishes them over ROS.
New in v0.2.2
This example was added in v0.2.2
.
This example is vidgear implementation of this wiki example.
# import roslib
import roslib
roslib.load_manifest("my_package")
# import other required libraries
import sys
import rospy
import cv2
from std_msgs.msg import String
from sensor_msgs.msg import Image
from cv_bridge import CvBridge, CvBridgeError
from vidgear.gears import VideoGear
# custom publisher class
class image_publisher:
def __init__(self, source=0, logging=False):
# create CV bridge
self.bridge = CvBridge()
# define publisher topic
self.image_pub = rospy.Publisher("image_topic_pub", Image)
# open stream with given parameters
self.stream = VideoGear(source=source, logging=logging).start()
# define publisher topic
rospy.Subscriber("image_topic_sub", Image, self.callback)
def callback(self, data):
# {do something with received ROS node data here}
# read frames
frame = self.stream.read()
# check for frame if None-type
if not (frame is None):
# {do something with the frame here}
# publish our frame
try:
self.image_pub.publish(self.bridge.cv2_to_imgmsg(frame, "bgr8"))
except CvBridgeError as e:
# catch any errors
print(e)
def close(self):
# stop stream
self.stream.stop()
def main(args):
# !!! define your own video source here !!!
# Open any video stream such as live webcam
# video stream on first index(i.e. 0) device
# define publisher
ic = image_publisher(source=0, logging=True)
# initiate ROS node on publisher
rospy.init_node("image_publisher", anonymous=True)
try:
# run node
rospy.spin()
except KeyboardInterrupt:
print("Shutting down")
finally:
# close publisher
ic.close()
if __name__ == "__main__":
main(sys.argv)
Using VideoGear for capturing RSTP/RTMP URLs⚓
Here's a high-level wrapper code around VideoGear API to enable auto-reconnection during capturing, plus stabilization is enabled (stabilize=True
) in order to stabilize captured frames on-the-go:
New in v0.2.2
This example was added in v0.2.2
.
Enforcing UDP stream
You can easily enforce UDP for RSTP streams inplace of default TCP, by putting following lines of code on the top of your existing code:
# import required libraries
import os
# enforce UDP
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;udp"
Finally, use backend
parameter value as backend=cv2.CAP_FFMPEG
in VideoGear.
from vidgear.gears import VideoGear
import cv2
import datetime
import time
class Reconnecting_VideoGear:
def __init__(self, cam_address, stabilize=False, reset_attempts=50, reset_delay=5):
self.cam_address = cam_address
self.stabilize = stabilize
self.reset_attempts = reset_attempts
self.reset_delay = reset_delay
self.source = VideoGear(
source=self.cam_address, stabilize=self.stabilize
).start()
self.running = True
def read(self):
if self.source is None:
return None
if self.running and self.reset_attempts > 0:
frame = self.source.read()
if frame is None:
self.source.stop()
self.reset_attempts -= 1
print(
"Re-connection Attempt-{} occured at time:{}".format(
str(self.reset_attempts),
datetime.datetime.now().strftime("%m-%d-%Y %I:%M:%S%p"),
)
)
time.sleep(self.reset_delay)
self.source = VideoGear(
source=self.cam_address, stabilize=self.stabilize
).start()
# return previous frame
return self.frame
else:
self.frame = frame
return frame
else:
return None
def stop(self):
self.running = False
self.reset_attempts = 0
self.frame = None
if not self.source is None:
self.source.stop()
if __name__ == "__main__":
# open any valid video stream
stream = Reconnecting_VideoGear(
cam_address="rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov",
reset_attempts=20,
reset_delay=5,
)
# loop over
while True:
# read frames from stream
frame = stream.read()
# check for frame if None-type
if frame is None:
break
# {do something with the frame here}
# Show output window
cv2.imshow("Output", frame)
# check for 'q' key if pressed
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
# close output window
cv2.destroyAllWindows()
# safely close video stream
stream.stop()
Using VideoGear for Real-time Stabilization with Audio Encoding⚓
In this example code, we will be directly merging the audio from a Video-File (to be stabilized) with its processed stabilized frames into a compressed video output in real time:
New in v0.2.4
This example was added in v0.2.4
.
Make sure this input video-file (to be stabilized) contains valid audio source, otherwise you could encounter multiple errors or no output at all.
You MUST use -input_framerate
attribute to set exact value of input framerate when using external audio in Real-time Frames mode, otherwise audio delay will occur in output streams.
Use -disable_force_termination
flag when video duration is too short(<60sec), otherwise WriteGear will not produce any valid output.
# import required libraries
from vidgear.gears import WriteGear
from vidgear.gears import VideoGear
import cv2
# Give suitable video file path to be stabilized
unstablized_videofile = "test.mp4"
# open any valid video path with stabilization enabled(`stabilize = True`)
stream_stab = VideoGear(source=unstablized_videofile, stabilize=True, logging=True).start()
# define required FFmpeg optimizing parameters for your writer
output_params = {
"-i": unstablized_videofile,
"-c:a": "aac",
"-input_framerate": stream_stab.framerate,
"-clones": ["-shortest"],
# !!! Uncomment following line if video duration is too short(<60sec). !!!
#"-disable_force_termination": True,
}
# Define writer with defined parameters and suitable output filename for e.g. `Output.mp4
writer = WriteGear(output_filename="Output.mp4", logging=True, **output_params)
# loop over
while True:
# read frames from stream
frame_stab = stream_stab.read()
# check for frame if not grabbed
if frame_stab is None:
break
# {do something with the stabilized frame here}
# write stabilized frame to writer
writer.write(frame_stab)
# safely close streams
stream_stab.stop()
# safely close writer
writer.close()