Helper Methods
logger_handler¶
Returns the logger handler
Returns: A logger handler
Source code in vidgear/gears/helper.py
def logger_handler():
"""
## logger_handler
Returns the logger handler
**Returns:** A logger handler
"""
# logging formatter
formatter = ColoredFormatter(
"{green}{asctime}{reset} :: {bold_purple}{name:^13}{reset} :: {log_color}{levelname:^8}{reset} :: {bold_white}{message}",
datefmt="%H:%M:%S",
reset=True,
log_colors={
"INFO": "bold_cyan",
"DEBUG": "bold_yellow",
"WARNING": "bold_red,fg_thin_yellow",
"ERROR": "bold_red",
"CRITICAL": "bold_red,bg_white",
},
style="{",
)
# check if VIDGEAR_LOGFILE defined
file_mode = os.environ.get("VIDGEAR_LOGFILE", False)
# define handler
handler = log.StreamHandler()
if file_mode and isinstance(file_mode, str):
file_path = os.path.abspath(file_mode)
if (os.name == "nt" or os.access in os.supports_effective_ids) and os.access(
os.path.dirname(file_path), os.W_OK
):
file_path = (
os.path.join(file_path, "vidgear.log")
if os.path.isdir(file_path)
else file_path
)
handler = log.FileHandler(file_path, mode="a")
formatter = log.Formatter(
"{asctime} :: {name} :: {levelname} :: {message}",
datefmt="%H:%M:%S",
style="{",
)
handler.setFormatter(formatter)
return handler
check_CV_version¶
Returns: OpenCV's version first bit
check_gstreamer_support¶
Checks whether OpenCV is compiled with Gstreamer(>=1.0.0
) support.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
logging | bool | enables logging for its operations | False |
Returns: A Boolean value
Source code in vidgear/gears/helper.py
def check_gstreamer_support(logging: bool = False) -> bool:
"""
## check_gstreamer_support
Checks whether OpenCV is compiled with Gstreamer(`>=1.0.0`) support.
Parameters:
logging (bool): enables logging for its operations
**Returns:** A Boolean value
"""
raw = cv2.getBuildInformation()
gst = [
x.strip()
for x in raw.split("\n")
if x and re.search(r"GStreamer[,-:]+\s*(?:YES|NO)", x)
]
if gst and "YES" in gst[0]:
version = re.search(r"(\d+\.)?(\d+\.)?(\*|\d+)", gst[0])
logging and logger.debug("Found GStreamer version:{}".format(version[0]))
return version[0] >= "1.0.0"
else:
logger.warning("GStreamer not found!")
return False
get_supported_resolution¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value | string | value to be validated | required |
logging | bool | enables logging for its operations | False |
Returns: Valid stream resolution
Source code in vidgear/gears/helper.py
def get_supported_resolution(value: str, logging=False) -> str:
"""
## get_supported_resolution
Parameters:
value (string): value to be validated
logging (bool): enables logging for its operations
**Returns:** Valid stream resolution
"""
# default to best
stream_resolution = "best"
supported_stream_qualities = [
"144p",
"240p",
"360p",
"480p",
"720p",
"1080p",
"1440p",
"2160p",
"4320p",
"worst",
"best",
]
if isinstance(value, str):
if value.strip().lower() in supported_stream_qualities:
stream_resolution = value.strip().lower()
logging and logger.debug(
"Selecting `{}` resolution for streams.".format(stream_resolution)
)
else:
logger.warning(
"Specified stream-resolution `{}` is not supported. Reverting to `best`!".format(
value
)
)
else:
logger.warning(
"Specified stream-resolution `{}` is Invalid. Reverting to `best`!".format(
value
)
)
return stream_resolution
dimensions_to_resolutions¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value | list | list of dimensions (e.g. | required |
Returns: list of resolutions (e.g. 360p
)
Source code in vidgear/gears/helper.py
def dimensions_to_resolutions(value: List[str]) -> List[str]:
"""
## dimensions_to_resolutions
Parameters:
value (list): list of dimensions (e.g. `640x360`)
**Returns:** list of resolutions (e.g. `360p`)
"""
supported_resolutions = {
"256x144": "144p",
"426x240": "240p",
"640x360": "360p",
"854x480": "480p",
"1280x720": "720p",
"1920x1080": "1080p",
"2560x1440": "1440p",
"3840x2160": "2160p",
"7680x4320": "4320p",
}
return (
list(map(supported_resolutions.get, value, value))
if isinstance(value, list)
else []
)
mkdir_safe¶
Safely creates directory at given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path | string | path to the directory | required |
logging | bool | enables logging for its operations | False |
Source code in vidgear/gears/helper.py
def mkdir_safe(dir_path: str, logging: bool = False) -> None:
"""
## mkdir_safe
Safely creates directory at given path.
Parameters:
dir_path (string): path to the directory
logging (bool): enables logging for its operations
"""
try:
os.makedirs(dir_path)
logging and logger.debug("Created directory at `{}`".format(dir_path))
except (OSError, IOError) as e:
if e.errno != errno.EACCES and e.errno != errno.EEXIST:
raise
delete_ext_safe¶
Safely deletes files with given extensions at given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path | string | path to the directory | required |
extensions | list | list of extensions to be deleted | [] |
logging | bool | enables logging for its operations | False |
Source code in vidgear/gears/helper.py
def delete_ext_safe(
dir_path: str, extensions: list = [], logging: bool = False
) -> None:
"""
## delete_ext_safe
Safely deletes files with given extensions at given path.
Parameters:
dir_path (string): path to the directory
extensions (list): list of extensions to be deleted
logging (bool): enables logging for its operations
"""
if not extensions or not os.path.exists(dir_path):
logger.warning("Invalid input provided for deleting!")
return
logger.critical("Clearing Assets at `{}`!".format(dir_path))
for ext in extensions:
if len(ext) == 2:
files_ext = [
os.path.join(dir_path, f)
for f in os.listdir(dir_path)
if f.startswith(ext[0]) and f.endswith(ext[1])
]
else:
files_ext = [
os.path.join(dir_path, f)
for f in os.listdir(dir_path)
if f.endswith(ext)
]
for file in files_ext:
delete_file_safe(file)
logging and logger.debug("Deleted file: `{}`".format(file))
capPropId¶
Retrieves the OpenCV property's Integer(Actual) value from string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
property | string | inputs OpenCV property as string. | required |
logging | bool | enables logging for its operations | True |
Returns: Resultant integer value.
Source code in vidgear/gears/helper.py
def capPropId(property: str, logging: bool = True) -> int:
"""
## capPropId
Retrieves the OpenCV property's Integer(Actual) value from string.
Parameters:
property (string): inputs OpenCV property as string.
logging (bool): enables logging for its operations
**Returns:** Resultant integer value.
"""
integer_value = 0
try:
integer_value = getattr(cv2, property)
except Exception as e:
logging and logger.exception(str(e))
logger.critical("`{}` is not a valid OpenCV property!".format(property))
return None
return integer_value
reducer¶
Reduces frame size by given percentage
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame | numpy.ndarray | inputs numpy array(frame). | None |
percentage | int/float | inputs size-reduction percentage. | 0 |
interpolation | int | Change resize interpolation. | 4 |
Returns: A reduced numpy ndarray array.
Source code in vidgear/gears/helper.py
def reducer(
frame: NDArray = None,
percentage: Union[int, float] = 0,
interpolation: int = cv2.INTER_LANCZOS4,
) -> NDArray:
"""
## reducer
Reduces frame size by given percentage
Parameters:
frame (numpy.ndarray): inputs numpy array(frame).
percentage (int/float): inputs size-reduction percentage.
interpolation (int): Change resize interpolation.
**Returns:** A reduced numpy ndarray array.
"""
# check if frame is valid
if frame is None:
raise ValueError("[Helper:ERROR] :: Input frame cannot be NoneType!")
# check if valid reduction percentage is given
if not (percentage > 0 and percentage < 90):
raise ValueError(
"[Helper:ERROR] :: Given frame-size reduction percentage is invalid, Kindly refer docs."
)
if not (isinstance(interpolation, int)):
raise ValueError(
"[Helper:ERROR] :: Given interpolation is invalid, Kindly refer docs."
)
# grab the frame size
(height, width) = frame.shape[:2]
# calculate the ratio of the width from percentage
reduction = ((100 - percentage) / 100) * width
ratio = reduction / float(width)
# construct the dimensions
dimensions = (int(reduction), int(height * ratio))
# return the resized frame
return cv2.resize(frame, dimensions, interpolation=interpolation)
create_blank_frame¶
Create blank frames of given frame size with text
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame | numpy.ndarray | inputs numpy array(frame). | None |
text | str | Text to be written on frame. | '' |
Returns: A reduced numpy ndarray array.
Source code in vidgear/gears/helper.py
def create_blank_frame(
frame: NDArray = None, text: str = "", logging: bool = False
) -> NDArray:
"""
## create_blank_frame
Create blank frames of given frame size with text
Parameters:
frame (numpy.ndarray): inputs numpy array(frame).
text (str): Text to be written on frame.
**Returns:** A reduced numpy ndarray array.
"""
# check if frame is valid
if frame is None or not (isinstance(frame, np.ndarray)):
raise ValueError("[Helper:ERROR] :: Input frame is invalid!")
# grab the frame size
(height, width) = frame.shape[:2]
# create blank frame
blank_frame = np.zeros(frame.shape, frame.dtype)
# setup text
if text and isinstance(text, str):
logging and logger.debug("Adding text: {}".format(text))
# setup font
font = cv2.FONT_HERSHEY_SCRIPT_COMPLEX
# get boundary of this text
fontScale = min(height, width) / (25 / 0.25)
textsize = cv2.getTextSize(text, font, fontScale, 5)[0]
# get coords based on boundary
textX = (width - textsize[0]) // 2
textY = (height + textsize[1]) // 2
# put text
cv2.putText(
blank_frame, text, (textX, textY), font, fontScale, (125, 125, 125), 6
)
# return frame
return blank_frame
dict2Args¶
Converts dictionary attributes to list(args)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
param_dict | dict | Parameters dictionary | required |
Returns: Arguments list
Source code in vidgear/gears/helper.py
def dict2Args(param_dict: dict) -> list:
"""
## dict2Args
Converts dictionary attributes to list(args)
Parameters:
param_dict (dict): Parameters dictionary
**Returns:** Arguments list
"""
args = []
for key in param_dict.keys():
if key in ["-clones"] or key.startswith("-core"):
if isinstance(param_dict[key], list):
args.extend(param_dict[key])
else:
logger.warning(
"{} with invalid datatype:`{}`, Skipped!".format(
"Core parameter" if key.startswith("-core") else "Clone",
param_dict[key],
)
)
else:
args.append(key)
args.append(str(param_dict[key]))
return args
get_valid_ffmpeg_path¶
Validate the given FFmpeg path/binaries, and returns a valid FFmpeg executable path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
custom_ffmpeg | string | path to custom FFmpeg executables | '' |
is_windows | boolean | is running on Windows OS? | False |
ffmpeg_download_path | string | FFmpeg static binaries download location (Windows only) | '' |
logging | bool | enables logging for its operations | False |
Returns: A valid FFmpeg executable path string.
Source code in vidgear/gears/helper.py
def get_valid_ffmpeg_path(
custom_ffmpeg: str = "",
is_windows: bool = False,
ffmpeg_download_path: str = "",
logging: bool = False,
) -> Union[str, bool]:
"""
## get_valid_ffmpeg_path
Validate the given FFmpeg path/binaries, and returns a valid FFmpeg executable path.
Parameters:
custom_ffmpeg (string): path to custom FFmpeg executables
is_windows (boolean): is running on Windows OS?
ffmpeg_download_path (string): FFmpeg static binaries download location _(Windows only)_
logging (bool): enables logging for its operations
**Returns:** A valid FFmpeg executable path string.
"""
final_path = ""
if is_windows:
# checks if current os is windows
if custom_ffmpeg:
# if custom FFmpeg path is given assign to local variable
final_path += custom_ffmpeg
else:
# otherwise auto-download them
try:
if not (ffmpeg_download_path):
# otherwise save to Temp Directory
import tempfile
ffmpeg_download_path = tempfile.gettempdir()
logging and logger.debug(
"FFmpeg Windows Download Path: {}".format(ffmpeg_download_path)
)
# download Binaries
os_bit = (
("win64" if platform.machine().endswith("64") else "win32")
if is_windows
else ""
)
_path = download_ffmpeg_binaries(
path=ffmpeg_download_path, os_windows=is_windows, os_bit=os_bit
)
# assign to local variable
final_path += _path
except Exception as e:
# log if any error occurred
logger.exception(str(e))
logger.error(
"Error in downloading FFmpeg binaries, Check your network and Try again!"
)
return False
if os.path.isfile(final_path):
# check if valid FFmpeg file exist
pass
elif os.path.isfile(os.path.join(final_path, "ffmpeg.exe")):
# check if FFmpeg directory exists, if does, then check for valid file
final_path = os.path.join(final_path, "ffmpeg.exe")
else:
# else return False
logging and logger.debug(
"No valid FFmpeg executables found at Custom FFmpeg path!"
)
return False
else:
# otherwise perform test for Unix
if custom_ffmpeg:
# if custom FFmpeg path is given assign to local variable
if os.path.isfile(custom_ffmpeg):
# check if valid FFmpeg file exist
final_path += custom_ffmpeg
elif os.path.isfile(os.path.join(custom_ffmpeg, "ffmpeg")):
# check if FFmpeg directory exists, if does, then check for valid file
final_path = os.path.join(custom_ffmpeg, "ffmpeg")
else:
# else return False
logging and logger.debug(
"No valid FFmpeg executables found at Custom FFmpeg path!"
)
return False
else:
# otherwise assign ffmpeg binaries from system
final_path += "ffmpeg"
logging and logger.debug("Final FFmpeg Path: {}".format(final_path))
# Final Auto-Validation for FFmeg Binaries. returns final path if test is passed
return final_path if validate_ffmpeg(final_path, logging=logging) else False
download_ffmpeg_binaries¶
Generates FFmpeg Static Binaries for windows(if not available)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | string | path for downloading custom FFmpeg executables | required |
os_windows | boolean | is running on Windows OS? | False |
os_bit | string | 32-bit or 64-bit OS? | '' |
Returns: A valid FFmpeg executable path string.
Source code in vidgear/gears/helper.py
def download_ffmpeg_binaries(
path: str, os_windows: bool = False, os_bit: str = ""
) -> str:
"""
## download_ffmpeg_binaries
Generates FFmpeg Static Binaries for windows(if not available)
Parameters:
path (string): path for downloading custom FFmpeg executables
os_windows (boolean): is running on Windows OS?
os_bit (string): 32-bit or 64-bit OS?
**Returns:** A valid FFmpeg executable path string.
"""
final_path = ""
if os_windows and os_bit:
# initialize with available FFmpeg Static Binaries GitHub Server
file_url = "https://github.com/abhiTronix/FFmpeg-Builds/releases/latest/download/ffmpeg-static-{}-gpl.zip".format(
os_bit
)
file_name = os.path.join(
os.path.abspath(path), "ffmpeg-static-{}-gpl.zip".format(os_bit)
)
file_path = os.path.join(
os.path.abspath(path), "ffmpeg-static-{}-gpl/bin/ffmpeg.exe".format(os_bit),
)
base_path, _ = os.path.split(file_name) # extract file base path
# check if file already exists
if os.path.isfile(file_path):
final_path += file_path # skip download if does
else:
# import libs
import zipfile
# check if given path has write access
assert os.access(path, os.W_OK), (
"[Helper:ERROR] :: Permission Denied, Cannot write binaries to directory = "
+ path
)
# remove leftovers if exists
os.path.isfile(file_name) and delete_file_safe(file_name)
# download and write file to the given path
with open(file_name, "wb") as f:
logger.debug(
"No Custom FFmpeg path provided. Auto-Installing FFmpeg static binaries from GitHub Mirror now. Please wait..."
)
# create session
with requests.Session() as http:
# setup retry strategy
retries = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
# Mount it for https usage
adapter = TimeoutHTTPAdapter(timeout=2.0, max_retries=retries)
http.mount("https://", adapter)
response = http.get(file_url, stream=True)
response.raise_for_status()
total_length = (
response.headers.get("content-length")
if "content-length" in response.headers
else len(response.content)
)
assert not (
total_length is None
), "[Helper:ERROR] :: Failed to retrieve files, check your Internet connectivity!"
bar = tqdm(total=int(total_length), unit="B", unit_scale=True)
for data in response.iter_content(chunk_size=4096):
f.write(data)
len(data) > 0 and bar.update(len(data))
bar.close()
logger.debug("Extracting executables.")
with zipfile.ZipFile(file_name, "r") as zip_ref:
zip_fname, _ = os.path.split(zip_ref.infolist()[0].filename)
zip_ref.extractall(base_path)
# perform cleaning
delete_file_safe(file_name)
logger.debug("FFmpeg binaries for Windows configured successfully!")
final_path += file_path
# return final path
return final_path
validate_ffmpeg¶
Validate FFmeg Binaries. returns True
if tests are passed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | string | absolute path of FFmpeg binaries | required |
logging | bool | enables logging for its operations | False |
Returns: A boolean value, confirming whether tests passed, or not?.
Source code in vidgear/gears/helper.py
def validate_ffmpeg(path: str, logging: bool = False) -> bool:
"""
## validate_ffmpeg
Validate FFmeg Binaries. returns `True` if tests are passed.
Parameters:
path (string): absolute path of FFmpeg binaries
logging (bool): enables logging for its operations
**Returns:** A boolean value, confirming whether tests passed, or not?.
"""
try:
# get the FFmpeg version
version = check_output([path, "-version"])
firstline = version.split(b"\n")[0]
version = firstline.split(b" ")[2].strip()
# log if test are passed
logging and logger.info("FFmpeg validity Test Passed!")
logging and logger.debug(
"Found valid FFmpeg Version: `{}` installed on this system".format(version)
)
except Exception as e:
# log if test are failed
logging and logger.exception(str(e))
logger.error("FFmpeg validity Test Failed!")
return False
return True
check_output¶
Returns stdin output from subprocess module
Source code in vidgear/gears/helper.py
def check_output(*args: Union[list, tuple], **kwargs: dict) -> bytes:
"""
## check_output
Returns stdin output from subprocess module
"""
# import libs
import subprocess as sp
# workaround for python bug: https://bugs.python.org/issue37380
if platform.system() == "Windows":
# see comment https://bugs.python.org/msg370334
sp._cleanup = lambda: None
# handle additional params
retrieve_stderr = kwargs.pop("force_retrieve_stderr", False)
# execute command in subprocess
process = sp.Popen(
stdout=sp.PIPE,
stderr=sp.DEVNULL if not (retrieve_stderr) else sp.PIPE,
*args,
**kwargs,
)
output, stderr = process.communicate()
retcode = process.poll()
# handle return code
if retcode and not (retrieve_stderr):
cmd = kwargs.get("args")
if cmd is None:
cmd = args[0]
error = sp.CalledProcessError(retcode, cmd)
error.output = output
raise error
return output if not (retrieve_stderr) else stderr
generate_auth_certificates¶
Auto-Generates, and Auto-validates CURVE ZMQ key-pairs for NetGear API's Secure Mode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | string | path for generating CURVE key-pairs | required |
overwrite | boolean | overwrite existing key-pairs or not? | False |
logging | bool | enables logging for its operations | False |
Returns: A valid CURVE key-pairs path as string.
Source code in vidgear/gears/helper.py
def generate_auth_certificates(
path: str, overwrite: bool = False, logging: bool = False
) -> tuple:
"""
## generate_auth_certificates
Auto-Generates, and Auto-validates CURVE ZMQ key-pairs for NetGear API's Secure Mode.
Parameters:
path (string): path for generating CURVE key-pairs
overwrite (boolean): overwrite existing key-pairs or not?
logging (bool): enables logging for its operations
**Returns:** A valid CURVE key-pairs path as string.
"""
# import necessary lib
import zmq.auth
# check if path corresponds to vidgear only
if os.path.basename(path) != ".vidgear":
path = os.path.join(path, ".vidgear")
# generate keys dir
keys_dir = os.path.join(path, "keys")
mkdir_safe(keys_dir, logging=logging)
# generate separate public and private key dirs
public_keys_dir = os.path.join(keys_dir, "public_keys")
secret_keys_dir = os.path.join(keys_dir, "private_keys")
# check if overwriting is allowed
if overwrite:
# delete previous certificates
for dirs in [public_keys_dir, secret_keys_dir]:
if os.path.exists(dirs):
shutil.rmtree(dirs)
mkdir_safe(dirs, logging=logging)
# generate new keys
server_public_file, server_secret_file = zmq.auth.create_certificates(
keys_dir, "server"
)
client_public_file, client_secret_file = zmq.auth.create_certificates(
keys_dir, "client"
)
# move keys to their appropriate directory respectively
for key_file in os.listdir(keys_dir):
if key_file.endswith(".key"):
shutil.move(os.path.join(keys_dir, key_file), public_keys_dir)
elif key_file.endswith(".key_secret"):
shutil.move(os.path.join(keys_dir, key_file), secret_keys_dir)
else:
# clean redundant keys if present
redundant_key = os.path.join(keys_dir, key_file)
if os.path.isfile(redundant_key):
delete_file_safe(redundant_key)
else:
# otherwise validate available keys
status_public_keys = validate_auth_keys(public_keys_dir, ".key")
status_private_keys = validate_auth_keys(secret_keys_dir, ".key_secret")
# check if all valid keys are found
if status_private_keys and status_public_keys:
return (keys_dir, secret_keys_dir, public_keys_dir)
# check if valid public keys are found
if not (status_public_keys):
mkdir_safe(public_keys_dir, logging=logging)
# check if valid private keys are found
if not (status_private_keys):
mkdir_safe(secret_keys_dir, logging=logging)
# generate new keys
server_public_file, server_secret_file = zmq.auth.create_certificates(
keys_dir, "server"
)
client_public_file, client_secret_file = zmq.auth.create_certificates(
keys_dir, "client"
)
# move keys to their appropriate directory respectively
for key_file in os.listdir(keys_dir):
if key_file.endswith(".key") and not (status_public_keys):
shutil.move(
os.path.join(keys_dir, key_file), os.path.join(public_keys_dir, ".")
)
elif key_file.endswith(".key_secret") and not (status_private_keys):
shutil.move(
os.path.join(keys_dir, key_file), os.path.join(secret_keys_dir, ".")
)
else:
# clean redundant keys if present
redundant_key = os.path.join(keys_dir, key_file)
if os.path.isfile(redundant_key):
delete_file_safe(redundant_key)
# validate newly generated keys
status_public_keys = validate_auth_keys(public_keys_dir, ".key")
status_private_keys = validate_auth_keys(secret_keys_dir, ".key_secret")
# raise error is validation test fails
if not (status_private_keys) or not (status_public_keys):
raise RuntimeError(
"[Helper:ERROR] :: Unable to generate valid ZMQ authentication certificates at `{}`!".format(
keys_dir
)
)
# finally return valid key paths
return (keys_dir, secret_keys_dir, public_keys_dir)
validate_audio¶
Validates audio by retrieving audio-bitrate from file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | string | absolute path of FFmpeg binaries | required |
source | string/list | source to be validated. | None |
Returns: A string value, confirming whether audio is present, or not?.
Source code in vidgear/gears/helper.py
def validate_audio(path: str, source: Union[str, list] = None) -> str:
"""
## validate_audio
Validates audio by retrieving audio-bitrate from file.
Parameters:
path (string): absolute path of FFmpeg binaries
source (string/list): source to be validated.
**Returns:** A string value, confirming whether audio is present, or not?.
"""
if source is None or not (source):
logger.warning("Audio input source is empty!")
return ""
# create ffmpeg command
cmd = [path, "-hide_banner"] + (
source if isinstance(source, list) else ["-i", source]
)
# extract metadata
metadata = check_output(cmd, force_retrieve_stderr=True)
# extract bitrate
audio_bitrate_meta = [
line.strip()
for line in metadata.decode("utf-8").split("\n")
if "Audio:" in line
]
audio_bitrate = (
re.findall(r"([0-9]+)\s(kb|mb|gb)\/s", audio_bitrate_meta[0])[-1]
if audio_bitrate_meta
else ""
)
# extract samplerate
audio_samplerate_metadata = [
line.strip()
for line in metadata.decode("utf-8").split("\n")
if all(x in line for x in ["Audio:", "Hz"])
]
audio_samplerate = (
re.findall(r"[0-9]+\sHz", audio_samplerate_metadata[0])[0]
if audio_samplerate_metadata
else ""
)
# format into actual readable bitrate value
if audio_bitrate:
# return bitrate directly
return "{}{}".format(int(audio_bitrate[0].strip()), audio_bitrate[1].strip()[0])
elif audio_samplerate:
# convert samplerate to bitrate first
sample_rate_value = int(audio_samplerate.split(" ")[0])
channels_value = 1 if "mono" in audio_samplerate_metadata[0] else 2
bit_depth_value = re.findall(
r"(u|s|f)([0-9]+)(le|be)", audio_samplerate_metadata[0]
)[0][1]
return (
(
str(
get_audio_bitrate(
sample_rate_value, channels_value, int(bit_depth_value)
)
)
+ "k"
)
if bit_depth_value
else ""
)
else:
return ""
extract_time¶
Extract time from give string value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value | string | string value. | required |
Returns: Time (in seconds) as integer.
Source code in vidgear/gears/helper.py
def extract_time(value: str) -> int:
"""
## extract_time
Extract time from give string value.
Parameters:
value (string): string value.
**Returns:** Time _(in seconds)_ as integer.
"""
if not (value):
logger.warning("Value is empty!")
return 0
else:
stripped_data = value.strip()
t_duration = re.findall(r"\d{2}:\d{2}:\d{2}(?:\.\d{2})?", stripped_data)
return (
sum(
float(x) * 60 ** i
for i, x in enumerate(reversed(t_duration[0].split(":")))
)
if t_duration
else 0
)
validate_video¶
Validates video by retrieving resolution/size and framerate from file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | string | absolute path of FFmpeg binaries | required |
video_path | string | absolute path to Video. | None |
Returns: A dictionary of retieved Video resolution (as tuple(width, height)) and framerate (as float).
Source code in vidgear/gears/helper.py
def validate_video(
path: str, video_path: str = None, logging: bool = False
) -> Optional[dict]:
"""
## validate_video
Validates video by retrieving resolution/size and framerate from file.
Parameters:
path (string): absolute path of FFmpeg binaries
video_path (string): absolute path to Video.
**Returns:** A dictionary of retieved Video resolution _(as tuple(width, height))_ and framerate _(as float)_.
"""
if video_path is None or not (video_path):
logger.warning("Video path is empty!")
return None
# extract metadata
metadata = check_output(
[path, "-hide_banner", "-i", video_path], force_retrieve_stderr=True
)
# clean and search
stripped_data = [x.decode("utf-8").strip() for x in metadata.split(b"\n")]
logging and logger.debug(stripped_data)
result = {}
for data in stripped_data:
output_a = re.findall(r"([1-9]\d+)x([1-9]\d+)", data)
output_b = re.findall(r"\d+(?:\.\d+)?\sfps", data)
if len(result) == 2:
break
if output_b and not "framerate" in result:
result["framerate"] = re.findall(r"[\d\.\d]+", output_b[0])[0]
if output_a and not "resolution" in result:
result["resolution"] = output_a[-1]
# return values
return result if (len(result) == 2) else None
is_valid_url¶
Checks URL validity by testing its scheme against FFmpeg's supported protocols
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | string | absolute path of FFmpeg binaries | required |
url | string | URL to be validated | None |
logging | bool | enables logging for its operations | False |
Returns: A boolean value, confirming whether tests passed, or not?.
Source code in vidgear/gears/helper.py
def is_valid_url(path: str, url: str = None, logging: bool = False) -> bool:
"""
## is_valid_url
Checks URL validity by testing its scheme against
FFmpeg's supported protocols
Parameters:
path (string): absolute path of FFmpeg binaries
url (string): URL to be validated
logging (bool): enables logging for its operations
**Returns:** A boolean value, confirming whether tests passed, or not?.
"""
if url is None or not (url):
logger.warning("URL is empty!")
return False
# extract URL scheme
extracted_scheme_url = url.split("://", 1)[0]
# extract all FFmpeg supported protocols
protocols = check_output([path, "-hide_banner", "-protocols"])
splitted = [x.decode("utf-8").strip() for x in protocols.split(b"\n")]
supported_protocols = splitted[splitted.index("Output:") + 1 : len(splitted) - 1]
# RTSP is a demuxer somehow
# support both RTSP and RTSPS(over SSL)
supported_protocols += (
["rtsp", "rtsps"] if "rtsp" in get_supported_demuxers(path) else []
)
# Test and return result whether scheme is supported
if extracted_scheme_url and extracted_scheme_url in supported_protocols:
logging and logger.debug(
"URL scheme `{}` is supported by FFmpeg.".format(extracted_scheme_url)
)
return True
else:
logger.warning(
"URL scheme `{}` isn't supported by FFmpeg!".format(extracted_scheme_url)
)
return False
import_dependency_safe¶
Imports specified dependency safely. By default(error = raise
), if a dependency is missing, an ImportError with a meaningful message will be raised. Otherwise if error = log
a warning will be logged and on error = silent
everything will be quit. But If a dependency is present, but older than specified, an error is raised if specified.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name | string | name of dependency to be imported. | required |
error | string | raise or Log or silence ImportError. Possible values are | 'raise' |
pkg_name | string | (Optional) package name of dependency(if different | None |
min_version | string | (Optional) required minimum version of the dependency to be imported. | None |
custom_message | string | (Optional) custom Import error message to be raised or logged. | None |
Returns: The imported module, when found and the version is correct(if specified). Otherwise None
.
Source code in vidgear/gears/helper.py
def import_dependency_safe(
name, error="raise", pkg_name=None, min_version=None, custom_message=None,
):
"""
## import_dependency_safe
Imports specified dependency safely. By default(`error = raise`), if a dependency is missing,
an ImportError with a meaningful message will be raised. Otherwise if `error = log` a warning
will be logged and on `error = silent` everything will be quit. But If a dependency is present,
but older than specified, an error is raised if specified.
Parameters:
name (string): name of dependency to be imported.
error (string): raise or Log or silence ImportError. Possible values are `"raise"`, `"log"` and `silent`. Default is `"raise"`.
pkg_name (string): (Optional) package name of dependency(if different `pip` name). Otherwise `name` will be used.
min_version (string): (Optional) required minimum version of the dependency to be imported.
custom_message (string): (Optional) custom Import error message to be raised or logged.
**Returns:** The imported module, when found and the version is correct(if specified). Otherwise `None`.
"""
# check specified parameters
sub_class = ""
if not name or not isinstance(name, str):
return None
else:
# extract name in case of relative import
name = name.strip()
if name.startswith("from"):
name = name.split(" ")
name, sub_class = (name[1].strip(), name[-1].strip())
assert error in [
"raise",
"log",
"silent",
], "[Vidgear:ERROR] :: Invalid value at `error` parameter."
# specify package name of dependency(if defined). Otherwise use name
install_name = pkg_name if not (pkg_name is None) else name
# create message
msg = (
custom_message
if not (custom_message is None)
else "Failed to find required dependency '{}'. Install it with `pip install {}` command.".format(
name, install_name
)
)
# try importing dependency
try:
module = importlib.import_module(name)
module = getattr(module, sub_class) if sub_class else module
except Exception as e:
if error == "raise":
if isinstance(e, ModuleNotFoundError):
# raise message
raise ModuleNotFoundError(msg) from None
else:
# raise error+message
raise ImportError(msg) from e
elif error == "log":
logger.error(msg, exc_info=sys.exc_info())
return None
else:
return None
# check if minimum required version
if not (min_version) is None:
# Handle submodules
parent_module = name.split(".")[0]
if parent_module != name:
# grab parent module
module_to_get = sys.modules[parent_module]
else:
module_to_get = module
# extract version
version = get_module_version(module_to_get)
# verify
if parse_version(version) < parse_version(min_version):
# create message
msg = """Unsupported version '{}' found. Vidgear requires '{}' dependency installed with version '{}' or greater.
Update it with `pip install -U {}` command.""".format(
parent_module, min_version, version, install_name
)
# handle errors.
if error == "silent":
return None
else:
# raise
raise ImportError(msg)
return module
get_video_bitrate¶
Calculate optimum Bitrate from resolution, framerate, bits-per-pixels values
Parameters:
Name | Type | Description | Default |
---|---|---|---|
width | int | video-width | required |
height | int | video-height | required |
fps | float | video-framerate | required |
bpp | float | bit-per-pixels value | required |
Returns: Video bitrate (in Kbps) as integer.
Source code in vidgear/gears/helper.py
def get_video_bitrate(width: int, height: int, fps: float, bpp: float) -> int:
"""
## get_video_bitrate
Calculate optimum Bitrate from resolution, framerate, bits-per-pixels values
Parameters:
width (int): video-width
height (int): video-height
fps (float): video-framerate
bpp (float): bit-per-pixels value
**Returns:** Video bitrate _(in Kbps)_ as integer.
"""
return round((width * height * bpp * fps) / 1000)
check_WriteAccess¶
Checks whether given path directory has Write-Access.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | string | absolute path of directory | required |
is_windows | boolean | is running on Windows OS? | False |
logging | bool | enables logging for its operations | False |
Returns: A boolean value, confirming whether Write-Access available, or not?.
Source code in vidgear/gears/helper.py
def check_WriteAccess(
path: str, is_windows: bool = False, logging: bool = False
) -> bool:
"""
## check_WriteAccess
Checks whether given path directory has Write-Access.
Parameters:
path (string): absolute path of directory
is_windows (boolean): is running on Windows OS?
logging (bool): enables logging for its operations
**Returns:** A boolean value, confirming whether Write-Access available, or not?.
"""
# check if path exists
dirpath = Path(path)
try:
if not (dirpath.exists() and dirpath.is_dir()):
logger.warning(
"Specified directory `{}` doesn't exists or valid.".format(path)
)
return False
else:
path = dirpath.resolve()
except:
return False
# check filepath on *nix systems
if not is_windows:
uid = os.geteuid()
gid = os.getegid()
s = os.stat(path)
mode = s[stat.ST_MODE]
return (
((s[stat.ST_UID] == uid) and (mode & stat.S_IWUSR))
or ((s[stat.ST_GID] == gid) and (mode & stat.S_IWGRP))
or (mode & stat.S_IWOTH)
)
# otherwise, check filepath on windows
else:
write_accessible = False
temp_fname = os.path.join(path, "temp.tmp")
try:
fd = os.open(temp_fname, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
os.close(fd)
write_accessible = True
except Exception as e:
if isinstance(e, PermissionError):
logger.error(
"You don't have adequate access rights to use `{}` directory!".format(
path
)
)
logging and logger.exception(str(e))
finally:
delete_file_safe(temp_fname)
return write_accessible
check_open_port¶
Checks whether specified port open at given IP address.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address | string | given IP address. | required |
port | int | check if port is open at given address. | 22 |
Returns: A boolean value, confirming whether given port is open at given IP address.
Source code in vidgear/gears/helper.py
def check_open_port(address: str, port: int = 22) -> bool:
"""
## check_open_port
Checks whether specified port open at given IP address.
Parameters:
address (string): given IP address.
port (int): check if port is open at given address.
**Returns:** A boolean value, confirming whether given port is open at given IP address.
"""
if not address:
return False
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
if sock.connect_ex((address, port)) == 0:
return True
else:
return False
delete_ext_safe¶
Safely deletes files at given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path | string | path to the file | required |
Source code in vidgear/gears/helper.py
get_supported_demuxers¶
Find and returns FFmpeg's supported demuxers
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | string | absolute path of FFmpeg binaries | required |
Returns: List of supported demuxers.
Source code in vidgear/gears/helper.py
def get_supported_demuxers(path: str) -> List[str]:
"""
## get_supported_demuxers
Find and returns FFmpeg's supported demuxers
Parameters:
path (string): absolute path of FFmpeg binaries
**Returns:** List of supported demuxers.
"""
demuxers = check_output([path, "-hide_banner", "-demuxers"])
splitted = [x.decode("utf-8").strip() for x in demuxers.split(b"\n")]
split_index = [idx for idx, s in enumerate(splitted) if "--" in s][0]
supported_demuxers = splitted[split_index + 1 : len(splitted) - 1]
# search all demuxers
outputs = [re.search(r"\s[a-z0-9_,-]{2,}\s", d) for d in supported_demuxers]
outputs = [o.group(0) for o in outputs if o]
# return demuxers output
return [o.strip() if not ("," in o) else o.split(",")[-1].strip() for o in outputs]
get_supported_vencoders¶
Find and returns FFmpeg's supported video encoders
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | string | absolute path of FFmpeg binaries | required |
Returns: List of supported encoders.
Source code in vidgear/gears/helper.py
def get_supported_vencoders(path: str) -> List[str]:
"""
## get_supported_vencoders
Find and returns FFmpeg's supported video encoders
Parameters:
path (string): absolute path of FFmpeg binaries
**Returns:** List of supported encoders.
"""
encoders = check_output([path, "-hide_banner", "-encoders"])
splitted = encoders.split(b"\n")
# extract video encoders
supported_vencoders = [
x.decode("utf-8").strip()
for x in splitted[2 : len(splitted) - 1]
if x.decode("utf-8").strip().startswith("V")
]
# compile regex
finder = re.compile(r"[A-Z]*[\.]+[A-Z]*\s[a-z0-9_-]*")
# find all outputs
outputs = finder.findall("\n".join(supported_vencoders))
# return output findings
return [[s for s in o.split(" ")][-1] for o in outputs]
validate_auth_keys¶
Validates, and also maintains generated ZMQ CURVE Key-pairs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | string | path of generated CURVE key-pairs | required |
extension | string | type of key-pair to be validated | required |
Returns: A boolean value, confirming whether tests passed, or not?.
Source code in vidgear/gears/helper.py
def validate_auth_keys(path: str, extension: str) -> bool:
"""
## validate_auth_keys
Validates, and also maintains generated ZMQ CURVE Key-pairs.
Parameters:
path (string): path of generated CURVE key-pairs
extension (string): type of key-pair to be validated
**Returns:** A boolean value, confirming whether tests passed, or not?.
"""
# check for valid path
if not (os.path.exists(path)):
return False
# check if directory empty
if not (os.listdir(path)):
return False
keys_buffer = [] # stores auth-keys
# loop over auth-keys
for key_file in os.listdir(path):
key = os.path.splitext(key_file)
# check if valid key is generated
if key and (key[0] in ["server", "client"]) and (key[1] == extension):
keys_buffer.append(key_file) # store it
# remove invalid keys if found
len(keys_buffer) == 1 and delete_file_safe(os.path.join(path, keys_buffer[0]))
# return results
return True if (len(keys_buffer) == 2) else False