Introduction to Flow APIs#

Flow APIs effectively abstract away the underlying pipeline details, allowing developers to focus solely on the goals of their specific tasks in a pythonic style. These high-level APIs emphasize “what to do” rather than “how to do it,” enabling developers to express their intentions in a more intuitive and concise manner. This abstraction simplifies the development process and improves code readability and maintainability.

Besides the explicitly declared arguments, Flow APIs also allow veterans to override standard Deepstream Element properties through kwargs, and the only trick is all the ‘-’ in a property name must be replaced with ‘_’.

For instance, users can specify the GPU they want to use when they create a flow for capturing:

# Create a capture flow on gpu 1
flow = Flow(Pipeline('caputure')).capture([video_file], gpu_id=1)

Within the service maker a wide range of common operations in the multimedia and deep learning fields are supported by Flow.

Capture#

The ‘capture’ method appends a capture operation to an empty flow. This operation takes a list of URIs as input, from which multiple streams carrying decoded data will be created within the flow.

# Flow for playback a mp4 video
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("playback")).capture([video_file]).render()()

Inject#

The ‘inject’ method appends an injection operation to an empty flow. The operation takes a list of BufferProvider objects and creates multiple streams thereafter.

In addition to overriding the ‘generate’ method to create buffers, BufferProvider objects must carry the following members to instruct the injection process:

member name

description

width

width of the video frames

height

height of the video frames

format

format: RGB/I420

framerate

framerate of the video

use_gpu

boolean value to indicate if the data is in GPU memory

from pyservicemaker import Pipeline, Flow, BufferProvider, Buffer

class MyBufferProvider(BufferProvider):

    def __init__(self, width, height, device='cpu', framerate=30, format="RGB"):
        super().__init__()
        self.width = width
        self.height = height
        self.format = format
        self.framerate = framerate
        self.device = device
        self.count = 0
        self.expected = 255

    def generate(self, size):
        data = [self.count]*(self.width*self.height*3)
        if self.count < self.expected:
            self.count += 1
        return Buffer() if self.count == self.expected else Buffer(data)

p = MyBufferProvider(320, 240)
# playback a mp4 video
Flow(Pipeline("playback")).inject([p]).render()()

Retrieve#

The ‘retrieve’ method appends a data retriever to the current flow. The operation takes a instance of BufferRetriever which implements the ‘consume’ method to access buffers. Invocation of the method result in the end of a flow and no more operation can be appended.

# Read the decoded video buffers from a sample mp4 file
from pyservicemaker import Pipeline, Flow, BufferRetriever

class MyBufferRetriever(BufferRetriever):
    def __init__(self):
        super().__init__()
        self.frames = 0

    def consume(self, buffer):
        tensor = buffer.extract(0)
        assert len(tensor.shape) == 3
        self.frames += 1
        return 1
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("retrieve")).capture([video_file]).retrieve(MyBufferRetriever())()

Decode#

The ‘decode’ method appends a decoding operation to the current flow. The operation adds a decoder to each upstream. It is very useful in the case that the data is injected via buffer providers.

class JpegBufferProvider(BufferProvider):

    def __init__(self, file_path:str):
        super().__init__()
        self._file_path = file_path
        self.format = "JPEG"
        self.width = 1280
        self.height = 720
        self.framerate = 0
        self.count = 0
        self.expected = 255

    def generate(self, size):
        data = []
        with open(self._file_path, "rb") as f:
            bytes = f.read()
            data = [int(b) for b in bytes]
        if self.count < self.expected:
            self.count += 1
        return Buffer() if self.count == self.expected else Buffer(data)

# decode jpeg from a binary buffer
jpeg_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.jpg"
Flow(Pipeline("test")).inject([JpegBufferProvider(jpeg_file)]).decode().render()()

Batch#

The ‘batch’ method appends a batching operation to the current flow to combine all the streams to a single batched one. This operation takes ‘batch_size’, ‘width’, and ‘height’ as parameters. If these parameters are not given, the operation sets the batch size to the number of streams and the width x height to 1920 x 1080 by default.

uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# playback all the source in a tiled display
Flow(Pipeline("playback")).capture(uri_list).batch().render()()

Batched Capture#

The ‘batch_capture’ method appends a operation to capture to an empty flow and batches the inputs. The operation takes either a list of URIs or a source config file as input and forming a batched stream thereafter.

uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# playback 4 mp4 videos at the same time
Flow(Pipeline("playback")).batch_capture(uri_list).render()()

Render#

The ‘render’ method appends a renderer to the current flow to display or discard the video. The operation takes a render mode to decide how to render the data, DISPLAY (default) and DISCARD are supported so far. Moreover, optional named arguments cover all the standard sink control parameters. Invocation of the method result in the end of a flow and no more operation can be appended.

# discard the video frames
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("playback")).capture([video_file]).render(mode=RenderMode.DISCARD)()

Encode#

The ‘encode’ method appends a encoder to the current flow to encode the video data into a file or rtsp stream. The operations takes a destination URI prefixed with ‘file://’ or ‘rtsp://’. If the prefix is missing, ‘file://’ is implied. In the case of RTSP stream, a port number must appear in the URI. Moreover, optional parameters for encoding control are supported too.

name

description

profile

profile: 0 for baseline (default), 1 for constrainted baseline, 2 for main, 4 for high

iframeinterval

Encoding Intra Frame occurrence frequency, default 10

bitrate

bitrate, default 2000000

# streaming a udp stream via rtsp
pipeline = Pipeline("test")
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(pipeline).capture([video_file]).encode("output.mp4", sync=True)()

Infer#

The ‘infer’ method enables the inference in the current flow. The operation takes a ‘config’ parameter for the model configuration file. Optional standard nvinfer parameters can be added to override the values in the configuration file.

pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
# object detection using resnet18 for 4 streams
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
Flow(Pipeline("infer")).batch_capture(uri_list).infer(pgie_config).render()()

Track#

The ‘track’ method appends a tracker to the current flow for tracking the detected object. The operation must come after ‘infer’ for detection data. Standard nvtracker parameters must be appropriately set to make the tracker work correctly.

pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# object detection and tracking using nvmultiobjecttracker for 4 streams
Flow(Pipeline("tracker")).batch_capture(uri_list).infer(pgie_config).track(
    ll_config_file="/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml",
    ll_lib_file="/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so"
).render()()

Publish#

The ‘publish’ method appends a procedure to the current flow for publishing events to the remote server. The operations takes the following parameters to set up the communication between the pipeline and the remote server.

name

description

msg_broker_proto_lib

The low level library used by the message broker

msg_broker_conn_str

The connect string for the server

topic

topic name

msg_conv_config

The message converter config for source information

# publish the object data to a kafka server
Flow(Pipeline("publish")).batch_capture(
    "/opt/nvidia/deepstream/deepstream/service-maker/sources/apps/deepstream_test5_app/source_list_dynamic.yaml"
).infer(
    "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
).attach(
    what="add_message_meta_probe",
    name="message_generator"
).publish(
    msg_broker_proto_lib="/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so",
    msg_broker_conn_str="qvs-ds-kafka.nvidia.com;9092",
    topic="test4app",
)()

Invocation of the method result in the end of a flow and no more operation can be appended.

Attach#

The ‘attach’ method attach a Probe to the current flow. Two parameters required:

name

description

what

can be a Probe object or name of the probe in a shared library

name

the instance name if probe is from shared library

Fork#

The ‘fork’ method forks the current flow so that more than one flow can be appended.

# Initiate a pipeline to read a mp4 file
# transcode the video to both a local file and via rtsp
# at the same time, do the playback
pipeline = Pipeline("test")
dest = "rtsp://localhost:8554"
flow = Flow(pipeline).capture(["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h265.mp4"]).batch().fork()
flow.encode(dest, sync=True)
flow.encode("/tmp/sample.mp4")
flow.render(sync=True)
flow()

The output RTSP stream can be received from “rtsp://localhost:8554/ds-test

FlowAPI Sample Applications Reference Table#

Reference test application

Path inside service-maker/sources directory

Description

Sample test application 1

apps/python/flow_api/deepstream_test1_app

Sample of how to use flowAPI methods for a single H.264 stream inference: batch_capture -> infer -> render. This app uses resnet18_trafficcamnet_pruned.onnx for detection.

Sample test application 2

apps/python/flow_api/deepstream_test2_app

Sample of how to use flowAPI methods for a single H.264 stream cascaded inference: batch_capture -> infer (primary detector) -> track -> infer (secondary classifier) -> render. This app uses resnet18_trafficcamnet_pruned.onnx for detection and 2 classifier models (i.e., resnet18_vehiclemakenet_pruned.onnx, resnet18_vehicletypenet_pruned.onnx).

Sample test application 3

apps/python/flow_api/deepstream_test3_app

Builds on flow_api/deepstream_test1(sample test application 1) to demonstrate how to:

  • Use multiple sources in the pipeline for inference.

  • Extract the stream metadata, which contains useful information about the frames in the batched buffer.

This app uses resnet18_trafficcamnet_pruned.onnx for detection.

Sample test application 4

apps/python/flow_api/deepstream_test4_app

Builds on flow_api/deepstream_test1 for a single H.264 stream inference to demonstrate how to use publish method to publish messages to a remote server and fork method to simultaneously render the output. This app uses resnet18_trafficcamnet_pruned.onnx for detection.

Inject and Retrieve Example

apps/python/flow_api/deepstream_appsrc_test_app

Demonstrates how to create a BufferRetriever for a retrieve method. The retrieve method with a customized BufferRetriever can be used to extract buffer data from the pipeline.