class FFdecoder:
"""
> FFdecoder API compiles and executes the FFmpeg pipeline inside a subprocess pipe for generating real-time, low-overhead, lightning fast video frames
with robust error-handling in python 🎞️⚡
FFdecoder API implements a **standalone highly-extensible wrapper around [FFmpeg](https://ffmpeg.org/)** multimedia framework that provides complete
control over the underline pipeline including **access to almost any FFmpeg specification thinkable** such as framerate, resolution, hardware decoder(s),
complex filter(s), and pixel format(s) that are readily supported by all well known Computer Vision libraries.
FFdecoder API **compiles its FFmpeg pipeline** by processing input Video Source metadata and User-defined options, and **runs it inside a
[`subprocess`](https://docs.python.org/3/library/subprocess.html) pipe** concurrently with the main thread, while extracting output dataframes(1D arrays)
into a Numpy buffer. These dataframes are consecutively grabbed from the buffer and decoded into ==[24-bit RGB](https://en.wikipedia.org/wiki/List_of_monochrome_and_RGB_color_formats#24-bit_RGB) _(default)_
[`ndarray`](https://numpy.org/doc/stable/reference/arrays.ndarray.html#the-n-dimensional-array-ndarray) 3D frames== that are readily available
through its [`generateFrame()`](#deffcode.ffdecoder.FFdecoder.generateFrame) method.
FFdecoder API **employs [Sourcer API](../../reference/sourcer) at its backend** for gathering, processing, and validating metadata of all
multimedia streams available in the given source for formulating/compiling its default FFmpeg pipeline. This metadata information is also
available as a JSON string with its [`metadata`](#deffcode.ffdecoder.FFdecoder.metadata) property object and can be updated as desired.
FFdecoder API **supports a wide-ranging media stream** as input source such as USB/Virtual/IP Camera Feed, Multimedia video file,
Screen Capture, Image Sequence, Network protocols _(such as HTTP(s), RTP/RSTP, etc.)_, so on and so forth.
Furthermore, FFdecoder API maintains the **standard [OpenCV-Python](https://docs.opencv.org/4.x/d6/d00/tutorial_py_root.html) _(Python API for OpenCV)_ coding syntax**, thereby making it even easier to
integrate this API in any Computer Vision application.
!!! example "For usage examples, kindly refer our **[Basic Recipes :cake:](../../recipes/basic)** and **[Advanced Recipes :croissant:](../../recipes/advanced)**"
!!! info "FFdecoder API parameters are explained [here ➶](params/)"
"""
def __init__(
self,
source: str | list[str],
source_demuxer: str | list[str] | None = None,
frame_format: str | None = None,
custom_ffmpeg: str = "",
verbose: bool = False,
**ffparams: Any,
) -> None:
"""
This constructor method initializes the object state and attributes of the FFdecoder Class.
Parameters:
source (str): defines the input(`-i`) source filename/URL/device-name/device-path.
source_demuxer (str): specifies the demuxer(`-f`) for the input source.
frame_format (str): sets pixel format(`-pix_fmt`) of the decoded frames.
custom_ffmpeg (str): assigns the location of custom path/directory for custom FFmpeg executable.
verbose (bool): enables/disables verbose.
ffparams (dict): provides the flexibility to control supported internal and FFmpeg parameters.
"""
# enable verbose if specified
self.__verbose_logs = verbose if (verbose and isinstance(verbose, bool)) else False
# define whether initializing
self.__initializing = True
# handle source list mapping
self.__is_multi = isinstance(source, list)
self.__source_list = source if self.__is_multi else [source]
# define frame pixel-format for decoded frames
self.__frame_format = (
frame_format.lower().strip() if isinstance(frame_format, str) else None
)
# handles user-defined parameters
self.__extra_params = {}
# handles process to be frames written
self.__process = None
# handles disabling window for ffmpeg subprocess on Windows
self.__ffmpeg_window_disabler_patch = False
# checks machine OS
self.__machine_OS = platform.system()
# handles exclusive metadata
self.__ff_pixfmt_metadata = None # metadata
self.__raw_frame_num = None # raw-frame number
self.__raw_frame_pixfmt = None # raw-frame pixformat
self.__raw_frame_dtype = None # raw-frame dtype
self.__raw_frame_depth = None # raw-frame depth
self.__raw_frame_resolution = None # raw-frame resolution/dimension
# define supported mode of operation
self.__supported_opmodes = {
"av": "Audio-Video", # audio is only for pass-through, not really for audio decoding yet.
"vo": "Video-Only",
"imgseq": "Image-Sequence",
# "ao":"Audio-Only", # reserved for future
}
# operation mode variable
self.__opmode = None
# handles termination
self.__terminate_stream = False
# cleans and reformat user-defined parameters
self.__extra_params = {
str(k).strip(): (
str(v).strip()
if v is not None and not isinstance(v, (dict, list, int, float, tuple))
else v
)
for k, v in ffparams.items()
}
# handles custom Sourcer API params
sourcer_params = self.__extra_params.pop("-custom_sourcer_params", {})
# reset improper values
sourcer_params = {} if not isinstance(sourcer_params, dict) else sourcer_params
# handles user ffmpeg pre-headers(parameters such as `-re`) parameters (must be a list)
self.__ffmpeg_prefixes = self.__extra_params.pop("-ffprefixes", [])
# check if not valid type
if not isinstance(self.__ffmpeg_prefixes, list):
# log it
logger.warning(
"Discarding invalid `-ffprefixes` value of wrong type: `{}`!".format(
type(self.__ffmpeg_prefixes).__name__
)
)
# reset improper values
self.__ffmpeg_prefixes = []
elif self.__is_multi:
# multi-input requires per-source list-of-lists with matching length;
# reject ambiguous flat lists or length mismatches up front so the
# cmd builder never sees a malformed shape.
if self.__ffmpeg_prefixes:
if not all(isinstance(p, list) for p in self.__ffmpeg_prefixes):
raise ValueError(
"Multi-input `-ffprefixes` must be a list of per-input lists "
"(e.g. `[['-re'], ['-stream_loop', '-1']]`). "
"Flat lists are ambiguous in multi-input mode."
)
if len(self.__ffmpeg_prefixes) != len(self.__source_list):
raise ValueError(
"`-ffprefixes` length ({}) must match `source` list length ({})!".format(
len(self.__ffmpeg_prefixes), len(self.__source_list)
)
)
sourcer_params["-ffprefixes"] = self.__ffmpeg_prefixes
else:
# single-input keeps the flat-list contract; nested lists are only
# meaningful in multi-input mode, so discard them with a warning.
if any(isinstance(p, list) for p in self.__ffmpeg_prefixes):
logger.warning(
"Nested lists in `-ffprefixes` are only supported for multi-input sources. Discarding!"
)
self.__ffmpeg_prefixes = []
sourcer_params["-ffprefixes"] = self.__ffmpeg_prefixes
# pass parameter(if specified) to Sourcer API, specifying where to save the downloaded FFmpeg Static
# assets on Windows(if specified)
sourcer_params["-ffmpeg_download_path"] = self.__extra_params.pop(
"-ffmpeg_download_path", ""
)
# handle video and audio stream indexes in case of multiple ones.
default_stream_indexes = self.__extra_params.pop("-default_stream_indexes", (0, 0))
# reset improper values
default_stream_indexes = (
(0, 0)
if not isinstance(default_stream_indexes, (list, tuple))
else default_stream_indexes
)
# pass FFmpeg filter to Sourcer API params for processing
if {"-vf", "-filter_complex"}.intersection(self.__extra_params.keys()):
key = "-vf" if "-vf" in self.__extra_params else "-filter_complex"
sourcer_params[key] = self.__extra_params[key]
# define dict to store user-defined parameters
self.__user_metadata = {}
# extract and assign source metadata as dict
(self.__sourcer_metadata, self.__missing_prop) = (
Sourcer(
source=source,
source_demuxer=source_demuxer,
verbose=verbose,
custom_ffmpeg=custom_ffmpeg if isinstance(custom_ffmpeg, str) else "",
**sourcer_params,
)
.probe_stream(default_stream_indexes=default_stream_indexes)
.retrieve_metadata(force_retrieve_missing=True)
)
# handle valid FFmpeg assets location
self.__ffmpeg = self.__sourcer_metadata["ffmpeg_binary_path"]
# handle YUV pixel formats(such as `yuv420p`, `yuv444p`, `nv12`, `nv21` etc.)
# patch for compatibility with OpenCV APIs.
self.__cv_patch = self.__extra_params.pop("-enforce_cv_patch", False)
if not (isinstance(self.__cv_patch, bool)):
self.__cv_patch = False
self.__verbose_logs and logger.critical(
"Enforcing OpenCV compatibility patch for YUV/NV video frames."
)
# handle Direct Luma (Grayscale) Extraction patch for YUV/NV streams
self.__extract_luma = self.__extra_params.pop("-extract_luma", False)
if not (isinstance(self.__extract_luma, bool)):
self.__extract_luma = False
if self.__extract_luma:
self.__verbose_logs and logger.critical(
"Enforcing Direct Luma (Grayscale) Extraction for YUV/NV video frames."
)
# handle asynchronous per-frame metadata extraction via `showinfo` filter
# when enabled, `generateFrame()` yields (frame, meta_dict) tuples
self.__extract_metadata = self.__extra_params.pop("-extract_metadata", False)
if not isinstance(self.__extract_metadata, bool):
self.__extract_metadata = False
# metadata queue and reader thread state (populated by __launch_FFdecoderline)
self.__metadata_queue: queue.Queue[dict[str, Any]] | None = None
self.__stderr_thread: threading.Thread | None = None
self.__stderr_stop = threading.Event()
if self.__extract_metadata:
self.__verbose_logs and logger.critical(
"Enabling asynchronous `showinfo` per-frame metadata extraction."
)
# handle disabling window for ffmpeg subprocess on Windows OS
# this patch prevents ffmpeg creation window from opening when
# building exe files
ffmpeg_window_disabler_patch = self.__extra_params.pop("-disable_ffmpeg_window", False)
if ffmpeg_window_disabler_patch and isinstance(ffmpeg_window_disabler_patch, bool):
# check if value is valid
if self.__machine_OS != "Windows" or self.__verbose_logs:
logger.warning(
"Optional `-disable_ffmpeg_window` flag is only available on Windows OS with `verbose=False`. Discarding!"
)
else:
self.__ffmpeg_window_disabler_patch = ffmpeg_window_disabler_patch
else:
# handle improper values
self.__ffmpeg_window_disabler_patch = False
# handle pass-through audio mode works in conjunction with WriteGear [TODO]
self.__passthrough_mode = self.__extra_params.pop("-passthrough_audio", False)
if not (isinstance(self.__passthrough_mode, bool)):
self.__passthrough_mode = False
# handle mode of operation
if self.__sourcer_metadata["source_has_image_sequence"]:
# image-sequence mode
self.__opmode = "imgseq"
elif (
self.__sourcer_metadata[
"source_has_video"
] # audio is only for pass-through, not really for audio decoding yet.
and self.__sourcer_metadata["source_has_audio"]
and self.__passthrough_mode # [TODO]
):
self.__opmode = "av"
# elif __defop_mode == "ao" and self.__sourcer_metadata.contains_audio: # [TODO]
# self.__opmode = "ao"
elif self.__sourcer_metadata["source_has_video"]:
# video-only mode
self.__opmode = "vo"
else:
# raise if unknown mode
raise ValueError("Unable to find any usable video stream in the given source!")
# store as metadata
self.__missing_prop["ffdecoder_operational_mode"] = self.__supported_opmodes[self.__opmode]
# handle user-defined output framerate
__framerate = self.__extra_params.pop("-framerate", None)
if (
isinstance(__framerate, str)
and __framerate == "null" # special mode to discard `-framerate/-r` parameter
):
self.__inputframerate = __framerate
elif isinstance(__framerate, (float, int)):
# assign input framerate
self.__inputframerate = float(__framerate) if __framerate > 0.0 else 0.0
else:
# warn if wrong type
__framerate is not None and logger.warning(
"Discarding invalid `-framerate` value of wrong type `{}`!".format(
type(__framerate).__name__
)
)
# reset to default
self.__inputframerate = 0.0
# handle user defined decoded frame resolution
self.__custom_resolution = self.__extra_params.pop("-custom_resolution", None)
if (
isinstance(self.__custom_resolution, str)
and self.__custom_resolution == "null" # special mode to discard `-size/-s` parameter
) or (
isinstance(self.__custom_resolution, (list, tuple))
and len(self.__custom_resolution) == 2 # valid resolution(must be a tuple or list)
):
# log it
self.__verbose_logs and not isinstance(self.__custom_resolution, str) and logger.debug(
"Setting raw frames size: `{}`.".format(self.__custom_resolution)
)
else:
# log it
self.__custom_resolution is not None and logger.warning(
"Discarding invalid `-custom_resolution` value: `{}`!".format(
self.__custom_resolution
)
)
# reset improper values
self.__custom_resolution = None
def formulate(self) -> FFdecoder:
"""
This method formulates all necessary FFmpeg pipeline arguments and executes it inside the FFmpeg `subprocess` pipe.
**Returns:** A reference to the FFdecoder class object.
"""
# assign values to class variables on first run
if self.__initializing:
if self.__is_multi and not {"-map", "-filter_complex"}.intersection(
self.__extra_params.keys()
):
raise ValueError(
"Multi-input setups require `-map` or `-filter_complex` to route the outputs unambiguously."
)
# prepare parameter dict
input_params = OrderedDict()
output_params = OrderedDict()
# dynamically pre-assign a default video-decoder (if not assigned by user).
supported_vdecodecs = get_supported_vdecoders(self.__ffmpeg)
default_vdecodec = (
self.__sourcer_metadata["source_video_decoder"]
if self.__sourcer_metadata["source_video_decoder"] in supported_vdecodecs
else "unknown"
)
if "-c:v" in self.__extra_params:
self.__extra_params["-vcodec"] = self.__extra_params.pop("-c:v", default_vdecodec)
# handle image sequence separately
if self.__opmode == "imgseq":
# -vcodec is discarded by default
# (This is correct or maybe -vcodec required in some unknown case) [TODO]
self.__extra_params.pop("-vcodec", None)
elif "-vcodec" in self.__extra_params and self.__extra_params["-vcodec"] is None:
# special case when -vcodec is not needed intentionally
self.__extra_params.pop("-vcodec", None)
else:
# assign video decoder selected here.
if "-vcodec" not in self.__extra_params:
input_params["-vcodec"] = default_vdecodec
else:
input_params["-vcodec"] = self.__extra_params.pop("-vcodec", default_vdecodec)
if (
default_vdecodec != "unknown"
and input_params["-vcodec"] not in supported_vdecodecs
):
# reset to default if not supported
logger.warning(
"Provided FFmpeg does not support `{}` video decoder. Switching to default supported `{}` decoder!".format(
input_params["-vcodec"], default_vdecodec
)
)
input_params["-vcodec"] = default_vdecodec
# raise error if not valid decoder found
if input_params["-vcodec"] not in supported_vdecodecs:
raise RuntimeError(
"Provided FFmpeg does not support any known usable video-decoders."
" Either define your own manually or switch to another FFmpeg binaries(if available)."
)
# handle user-defined number of frames.
if "-vframes" in self.__extra_params:
self.__extra_params["-frames:v"] = self.__extra_params.pop("-vframes", None)
if "-frames:v" in self.__extra_params:
value = self.__extra_params.pop("-frames:v", None)
if value is not None and value > 0:
output_params["-frames:v"] = value
# dynamically calculate default raw-frames pixel format(if not assigned by user).
# notify FFmpeg `-pix_fmt` parameter cannot be assigned directly
if "-pix_fmt" in self.__extra_params:
logger.warning(
"Discarding user-defined `-pix_fmt` value as it can only be assigned with `frame_format` parameter!"
)
self.__extra_params.pop("-pix_fmt", None)
# get supported FFmpeg pixfmt data with depth and bpp(bits-per-pixel)
self.__ff_pixfmt_metadata = get_supported_pixfmts(self.__ffmpeg)
supported_pixfmts = [fmts[0] for fmts in self.__ff_pixfmt_metadata]
# calculate default pixel-format
# Check special case - `frame_format`(or `-pix_fmt`) parameter discarded from pipeline
self.__frame_format == "null" and logger.critical(
"Manually discarding `frame_format`(or `-pix_fmt`) parameter from this pipeline."
)
# choose between rgb24(if available) or source pixel-format
# otherwise, only source pixel-format for special case
default_pixfmt = (
"rgb24"
if "rgb24" in supported_pixfmts and self.__frame_format != "null"
else self.__sourcer_metadata["source_video_pixfmt"]
)
# assign output raw-frames pixel format
rawframe_pixfmt = None
if self.__frame_format is not None and self.__frame_format in supported_pixfmts:
# check if valid and supported `frame_format` parameter assigned
rawframe_pixfmt = self.__frame_format.strip()
self.__verbose_logs and logger.info(
"User-defined `{}` frame pixel-format will be used for this pipeline.".format(
rawframe_pixfmt
)
)
elif (
"output_frames_pixfmt"
in self.__sourcer_metadata # means `format` filter is defined
and self.__sourcer_metadata["output_frames_pixfmt"] in supported_pixfmts
):
# assign if valid and supported
rawframe_pixfmt = self.__sourcer_metadata["output_frames_pixfmt"].strip()
self.__verbose_logs and logger.info(
"FFmpeg filter values will be used for this pipeline for defining output pixel-format."
)
else:
# reset to default if not supported
rawframe_pixfmt = default_pixfmt
# log it accordingly
if self.__frame_format is None:
logger.info(
"Using default `{}` pixel-format for this pipeline.".format(default_pixfmt)
)
else:
logger.warning(
"{} Switching to default `{}` pixel-format!".format(
(
"Provided FFmpeg does not supports `{}` pixel-format.".format(
self.__sourcer_metadata.get(
"output_frames_pixfmt", self.__frame_format
)
)
if self.__frame_format != "null"
else "No usable pixel-format defined."
),
default_pixfmt,
)
)
# dynamically calculate raw-frame datatype based on pixel-format selected
(self.__raw_frame_depth, rawframesbpp) = next(
(int(x[1]), int(x[2])) for x in self.__ff_pixfmt_metadata if x[0] == rawframe_pixfmt
)
raw_bit_per_component = (
rawframesbpp // self.__raw_frame_depth if self.__raw_frame_depth else 0
)
if 4 <= raw_bit_per_component <= 8:
self.__raw_frame_dtype = np.dtype("u1")
elif 8 < raw_bit_per_component <= 16 and rawframe_pixfmt.endswith(("le", "be")):
if rawframe_pixfmt.endswith("le"):
self.__raw_frame_dtype = np.dtype("<u2")
else:
self.__raw_frame_dtype = np.dtype(">u2")
else:
# reset to both pixel-format and datatype to default if not supported
self.__frame_format is not None and logger.warning(
"Selected pixel-format `{}` dtype is not supported by FFdecoder API. Switching to default `rgb24` pixel-format!".format(
rawframe_pixfmt
)
)
rawframe_pixfmt = "rgb24"
self.__raw_frame_dtype = np.dtype("u1")
# Check if not special case
if self.__frame_format != "null":
# assign to FFmpeg pipeline otherwise
output_params["-pix_fmt"] = rawframe_pixfmt
# assign to global parameter further usage
self.__raw_frame_pixfmt = rawframe_pixfmt
# also override as metadata(if available)
if "output_frames_pixfmt" in self.__sourcer_metadata:
self.__sourcer_metadata["output_frames_pixfmt"] = self.__raw_frame_pixfmt
# handle raw-frame resolution
# notify FFmpeg `-s` parameter cannot be assigned directly
if "-s" in self.__extra_params:
logger.warning(
"Discarding user-defined `-s` FFmpeg parameter as it can only be assigned with `-custom_resolution` attribute! Read docs for more details."
)
self.__extra_params.pop("-s", None)
# assign output rawframe resolution
if self.__custom_resolution is not None and not isinstance(
self.__custom_resolution, str
):
# assign if assigned by user and not "null"(str)
self.__raw_frame_resolution = self.__custom_resolution
self.__verbose_logs and logger.info(
"User-defined `{}` frame resolution will be used for this pipeline.".format(
self.__raw_frame_resolution
)
)
elif (
"output_frames_resolution"
in self.__sourcer_metadata # means `scale` filter is defined
and self.__sourcer_metadata["output_frames_resolution"]
and len(self.__sourcer_metadata["output_frames_resolution"]) == 2
):
# calculate raw-frame resolution/dimensions based on output.
self.__raw_frame_resolution = self.__sourcer_metadata["output_frames_resolution"]
elif (
self.__sourcer_metadata["source_video_resolution"]
and len(self.__sourcer_metadata["source_video_resolution"]) == 2
):
# calculate raw-frame resolution/dimensions based on source.
self.__raw_frame_resolution = self.__sourcer_metadata["source_video_resolution"]
else:
# otherwise raise error
raise RuntimeError(
"Both source and output metadata values found Invalid with {} `-custom_resolution` attribute. Aborting!".format(
"null" if isinstance(self.__inputframerate, str) else "undefined"
)
)
# special mode to discard `-size/-s` FFmpeg parameter completely
if isinstance(self.__custom_resolution, str):
logger.critical(
"Manually discarding `-size/-s` FFmpeg parameter from this pipeline."
)
else:
# add to pipeline
dimensions = "{}x{}".format(
self.__raw_frame_resolution[0], self.__raw_frame_resolution[1]
)
output_params["-s"] = str(dimensions)
# log if filters or default source is used
self.__verbose_logs and (
self.__custom_resolution is None or isinstance(self.__custom_resolution, str)
) and logger.info(
"{} for this pipeline for defining output resolution.".format(
"FFmpeg filter values will be used"
if "output_frames_resolution" in self.__sourcer_metadata
else "Default source resolution will be used"
)
)
# dynamically calculate raw-frame framerate based on source (if not assigned by user).
if not isinstance(self.__inputframerate, str) and self.__inputframerate > 0.0:
# assign if assigned by user and not "null"(str)
output_params["-framerate"] = str(self.__inputframerate)
self.__verbose_logs and logger.info(
"User-defined `{}` output framerate will be used for this pipeline.".format(
str(self.__inputframerate)
)
)
elif (
"output_framerate" in self.__sourcer_metadata # means `fps` filter is defined
and self.__sourcer_metadata["output_framerate"] > 0.0
):
# special mode to discard `-framerate/-r` FFmpeg parameter completely
if self.__inputframerate == "null":
logger.critical(
"Manually discarding `-framerate/-r` FFmpeg parameter from this pipeline."
)
else:
# calculate raw-frame framerate based on output
output_params["-framerate"] = str(self.__sourcer_metadata["output_framerate"])
self.__verbose_logs and logger.info(
"FFmpeg filter values will be used for this pipeline for defining output framerate."
)
elif self.__sourcer_metadata["source_video_framerate"] > 0.0:
# special mode to discard `-framerate/-r` FFmpeg parameter completely
if self.__inputframerate == "null":
logger.critical(
"Manually disabling `-framerate/-r` FFmpeg parameter for this pipeline."
)
else:
# calculate raw-frame framerate based on source
output_params["-framerate"] = str(
self.__sourcer_metadata["source_video_framerate"]
)
self.__verbose_logs and logger.info(
"Default source framerate will be used for this pipeline for defining output framerate."
)
else:
# otherwise raise error
raise RuntimeError(
"Both source and output metadata values found Invalid with {} `-framerate` attribute. Aborting!".format(
"null" if isinstance(self.__inputframerate, str) else "undefined"
)
)
# add rest to output parameters
output_params.update(self.__extra_params)
# chain the `showinfo` filter onto the pipeline when per-frame
# metadata extraction is enabled. A pre-existing `-vf` is preserved
# via comma-concatenation; `-filter_complex` is not supported here
# because graph-label routing is ambiguous.
if self.__extract_metadata:
if "-filter_complex" in output_params:
logger.warning(
"`-extract_metadata` is incompatible with `-filter_complex`. Disabling metadata extraction."
)
self.__extract_metadata = False
else:
existing_vf = output_params.get("-vf", "")
output_params["-vf"] = f"{existing_vf},showinfo" if existing_vf else "showinfo"
# dynamically calculate raw-frame numbers based on source (if not assigned by user).
# TODO Added support for `-re -stream_loop` and `-loop`
if "-frames:v" in input_params:
self.__raw_frame_num = input_params["-frames:v"]
elif (
self.__sourcer_metadata["approx_video_nframes"] is not None
and self.__sourcer_metadata["approx_video_nframes"] > 0
):
self.__raw_frame_num = self.__sourcer_metadata["approx_video_nframes"]
else:
self.__raw_frame_num = None
# log that number of frames are unknown
self.__verbose_logs and logger.info(
"Number of frames in given source are unknown. Live/Network/Looping stream detected!"
)
# log Mode of Operation
self.__verbose_logs and logger.critical(
"Activating {} Mode of Operation.".format(self.__supported_opmodes[self.__opmode])
)
# compose the Pipeline using formulated FFmpeg parameters
self.__launch_FFdecoderline(input_params, output_params)
# inform the initialization is completed
self.__initializing = False
else:
# warn if pipeline is recreated
logger.error("This pipeline is already created and running!")
return self
def __fetchNextfromPipeline(self) -> np.ndarray | None:
"""
This Internal method to fetch next dataframes(1D arrays) from `subprocess` pipe's standard output(`stdout`) into a Numpy buffer.
"""
assert self.__process is not None, (
"Pipeline is not running! You must call `formulate()` method first."
)
# formulated raw frame size and apply YUV pixel formats patch(if applicable)
raw_frame_size = (
(self.__raw_frame_resolution[0] * (self.__raw_frame_resolution[1] * 3 // 2))
if self.__raw_frame_pixfmt.startswith(("yuv", "nv"))
and (self.__cv_patch or self.__extract_luma)
else (
self.__raw_frame_depth
* self.__raw_frame_resolution[0]
* self.__raw_frame_resolution[1]
)
)
# next dataframe as numpy ndarray
nparray = None
try:
# read bytes frames from buffer
nparray = np.frombuffer(
self.__process.stdout.read(raw_frame_size * self.__raw_frame_dtype.itemsize),
dtype=self.__raw_frame_dtype,
)
except Exception as e:
raise RuntimeError("Frame buffering failed with error: {}".format(str(e)))
return nparray if nparray is not None and len(nparray) == raw_frame_size else None
def __fetchNextFrame(self) -> np.ndarray | None:
"""
This Internal method grabs and decodes next 3D `ndarray` video-frame from the buffer.
"""
# Read next and reconstruct as numpy array
frame = self.__fetchNextfromPipeline()
# check if empty
if frame is None:
return frame
elif self.__extract_luma and self.__raw_frame_pixfmt.startswith(("yuv", "nv")):
# Extract pure Luma (Y channel) - sits uncompressed at the top of the YUV bytestream
# Slice the first W*H bytes and reshape to 2D
luma_size = self.__raw_frame_resolution[1] * self.__raw_frame_resolution[0]
frame = frame[:luma_size].reshape(
(
self.__raw_frame_resolution[1],
self.__raw_frame_resolution[0],
)
)
elif self.__raw_frame_pixfmt.startswith("gray"):
# reconstruct exclusive `gray` frames
frame = frame.reshape(
(
self.__raw_frame_resolution[1],
self.__raw_frame_resolution[0],
self.__raw_frame_depth,
)
)[:, :, 0]
elif self.__raw_frame_pixfmt.startswith(("yuv", "nv")) and self.__cv_patch:
# reconstruct exclusive YUV formats frames for OpenCV APIs
frame = frame.reshape(
self.__raw_frame_resolution[1] * 3 // 2,
self.__raw_frame_resolution[0],
)
else:
# reconstruct default frames
frame = frame.reshape(
(
self.__raw_frame_resolution[1],
self.__raw_frame_resolution[0],
self.__raw_frame_depth,
)
)
# return frame
return frame
def generateFrame(self) -> Generator[np.ndarray, None, None]:
"""
This method returns a [Generator function](https://wiki.python.org/moin/Generators)
_(also an Iterator using `next()`)_ of video frames, grabbed continuously from the buffer.
When the `-extract_metadata` parameter is enabled the generator yields
`(frame, metadata)` tuples, where `metadata` is a dict with keys
`frame_num`, `pts_time`, `is_keyframe`, and `frame_type`.
"""
if self.__raw_frame_num is None or not self.__raw_frame_num:
while not self.__terminate_stream: # infinite raw frames
frame = self.__fetchNextFrame()
if frame is None:
self.__terminate_stream = True
break
yield self.__attach_metadata(frame)
else:
for _ in range(self.__raw_frame_num): # finite raw frames
frame = self.__fetchNextFrame()
if frame is None:
self.__terminate_stream = True
break
yield self.__attach_metadata(frame)
def __attach_metadata(self, frame: np.ndarray):
"""
Internal: zip the just-decoded frame with the next queued metadata
dict when `-extract_metadata` is enabled. Uses a bounded timeout so a
mis-emitting filter chain can never deadlock the consumer.
"""
if not self.__extract_metadata:
return frame
try:
meta = self.__metadata_queue.get(timeout=10.0)
except queue.Empty:
logger.warning(
"Timed-out waiting for `showinfo` metadata. Yielding frame with empty metadata."
)
meta = None
return (frame, meta)
def __enter__(self) -> FFdecoder:
"""
Handles entry with the `with` statement. See [PEP343 -- The 'with' statement'](https://peps.python.org/pep-0343/).
**Returns:** Output of `formulate()` method.
"""
return self.formulate()
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""
Handles exit with the `with` statement. See [PEP343 -- The 'with' statement'](https://peps.python.org/pep-0343/).
"""
self.terminate()
@property
def metadata(self) -> str:
"""
A property object that dumps metadata information as JSON string.
**Returns:** Metadata as JSON string.
"""
# import dependency
import json
# return complete metadata information as JSON string
return json.dumps(
{
**self.__sourcer_metadata, # source video
**self.__missing_prop, # missing properties
**self.__user_metadata, # user-defined
},
indent=2,
)
@metadata.setter
def metadata(self, value: dict[str, Any]) -> None:
"""
A property object that updates metadata information with user-defined dictionary.
Parameters:
value (dict): User-defined dictionary.
"""
# check if value dict type
if value and isinstance(value, dict):
# log it
self.__verbose_logs and logger.info("Updating Metadata...")
# extract any source and output internal metadata keys
default_keys = set(value).intersection(
{**self.__sourcer_metadata, **self.__missing_prop}
)
# counterpart source properties for each output properties
counterpart_prop = {
"output_frames_resolution": "source_video_resolution",
"output_frames_pixfmt": "source_video_pixfmt",
"output_framerate": "source_video_framerate",
}
# iterate over source metadata keys and sanitize it
for key in default_keys or []:
if key == "source":
# metadata properties that cannot be altered
logger.warning(
"`{}` metadata property value cannot be altered. Discarding!".format(key)
)
elif key in self.__missing_prop:
# missing metadata properties are unavailable and read-only
# notify user about alternative counterpart property (if available)
logger.warning(
"`{}` metadata property is read-only".format(key)
+ (
". Try updating `{}` property instead!".format(counterpart_prop[key])
if key in counterpart_prop
else " and cannot be updated!"
)
)
elif isinstance(value[key], type(self.__sourcer_metadata[key])):
# check if correct datatype as original
self.__verbose_logs and logger.info(
"Updating `{}`{} metadata property to `{}`.".format(
key,
(" and its counterpart" if key in counterpart_prop.values() else ""),
value[key],
)
)
# update source metadata if valid
self.__sourcer_metadata[key] = value[key]
# also update missing counterpart property (if available)
counter_key = next((k for k, v in counterpart_prop.items() if v == key), "")
if counter_key:
self.__missing_prop[counter_key] = value[key]
else:
# otherwise discard and log it
logger.warning(
"Manually assigned `{}` metadata property value is of invalid type. Discarding!"
).format(key)
# delete invalid key
del value[key]
# There is no concept of a tuple in the JSON format.
# Python's `json` module converts Python tuples to JSON lists
# because that's the closest thing in JSON to a tuple.
any(isinstance(value[x], tuple) for x in value) and logger.warning(
"All TUPLE metadata properties will be converted to LIST datatype. Read docs for more details."
)
# update user-defined metadata
self.__user_metadata.update(value)
else:
# otherwise raise error
raise ValueError("Invalid datatype metadata assigned. Aborting!")
def __launch_FFdecoderline(
self, input_params: dict[str, Any], output_params: dict[str, Any]
) -> None:
"""
This Internal method executes FFmpeg pipeline arguments inside a `subprocess` pipe in a new process.
Parameters:
input_params (dict): Input FFmpeg parameters
output_params (dict): Output FFmpeg parameters
"""
# convert input parameters to list
input_parameters = dict2Args(input_params)
# convert output parameters to list
output_parameters = dict2Args(output_params)
# format command
cmd = [self.__ffmpeg] + (["-hide_banner"] if not self.__verbose_logs else [])
if self.__is_multi:
for idx, _src in enumerate(self.__source_list):
_prefixes = (
self.__ffmpeg_prefixes[idx]
if len(self.__ffmpeg_prefixes) > idx
and isinstance(self.__ffmpeg_prefixes[idx], list)
else []
)
cmd += _prefixes
# apply standard input parameters ONLY to the primary source
if idx == 0:
cmd += input_parameters
_src_meta = (
self.__sourcer_metadata["sources"][idx]
if "sources" in self.__sourcer_metadata
else {}
)
if _src_meta.get("source_demuxer"):
cmd += ["-f", _src_meta["source_demuxer"]]
cmd += ["-i", _src]
else:
cmd += self.__ffmpeg_prefixes
cmd += input_parameters
cmd += (
["-f", self.__sourcer_metadata["source_demuxer"]]
if ("source_demuxer" in self.__sourcer_metadata)
else []
)
cmd += ["-i", self.__sourcer_metadata["source"]]
cmd += output_parameters
cmd += ["-f", "rawvideo", "-"]
# When metadata extraction is enabled we must capture stderr regardless
# of verbose mode so the background reader thread can parse showinfo
# lines. Without PIPE the reader would have nothing to read (verbose
# inherits parent stderr; silent discards it).
if self.__extract_metadata:
stderr_target = sp.PIPE
elif self.__verbose_logs:
stderr_target = None
else:
stderr_target = sp.DEVNULL
# compose the FFmpeg process
if self.__verbose_logs:
logger.debug("Executing FFmpeg command: `{}`".format(" ".join(cmd)))
self.__process = sp.Popen(cmd, stdin=sp.DEVNULL, stdout=sp.PIPE, stderr=stderr_target)
else:
# In silent mode
self.__process = sp.Popen(
cmd,
stdin=sp.DEVNULL,
stdout=sp.PIPE,
stderr=stderr_target,
creationflags=( # this prevents ffmpeg creation window from opening when building exe files on Windows
sp.DETACHED_PROCESS if self.__ffmpeg_window_disabler_patch else 0
),
)
# spin up the stderr reader thread that parses `showinfo` lines and
# feeds per-frame metadata dicts into the queue consumed by generateFrame()
if self.__extract_metadata:
self.__metadata_queue = queue.Queue()
self.__stderr_stop.clear()
self.__stderr_thread = threading.Thread(target=self.__read_stderr, daemon=True)
self.__stderr_thread.start()
def __read_stderr(self) -> None:
"""
Internal: background daemon that parses FFmpeg `showinfo` lines off
stderr and pushes per-frame metadata dicts onto `__metadata_queue`.
Exits when FFmpeg closes stderr or when `__stderr_stop` is signalled.
"""
assert self.__process is not None and self.__process.stderr is not None
stderr = self.__process.stderr
try:
for line in iter(stderr.readline, b""):
if self.__stderr_stop.is_set():
break
decoded = line.decode("utf-8", errors="ignore")
match = _SHOWINFO_REGEX.search(decoded)
if not match:
continue
meta = {
"frame_num": int(match.group(1)),
"pts_time": float(match.group(2)),
"is_keyframe": bool(int(match.group(3))),
"frame_type": match.group(4),
}
self.__metadata_queue.put(meta)
except (ValueError, OSError):
# stderr pipe closed mid-readline during termination
pass
finally:
# sentinel so consumers unblock on EOF
self.__metadata_queue.put(None)
def terminate(self) -> None:
"""
Safely terminates all processes.
"""
# signal we are closing
self.__verbose_logs and logger.debug("Terminating FFdecoder Pipeline...")
self.__terminate_stream = True
self.__stderr_stop.set()
# check if no process was initiated at first place
if self.__process is None or self.__process.poll() is not None:
logger.info("Pipeline already terminated.")
return
# Attempt to close pipeline.
# close `stdin` output
self.__process.stdin and self.__process.stdin.close()
# close `stdout` output
self.__process.stdout and self.__process.stdout.close()
# close `stderr` so the background reader thread's blocking readline() unblocks
self.__process.stderr and self.__process.stderr.close()
# terminate/kill process if still processing
self.__process.poll() is None and self.__process.terminate()
# wait if not exiting
self.__process.wait()
# join the stderr reader thread so it does not outlive the pipeline
if self.__stderr_thread is not None and self.__stderr_thread.is_alive():
self.__stderr_thread.join(timeout=2.0)
self.__stderr_thread = None
self.__process = None
logger.info("Pipeline terminated successfully.")