Introduction to Pipeline APIs#

For developers already familiar with the DeepStream SDK, pyservicemaker offers Pipeline APIs that enable them to fully harness the capabilities of DeepStream.

Creating a sample Deepstream application in Python using Pipeline APIs closely mirrors the process with C++ APIs, with the notable distinction that it doesn’t require a Makefile or build process.

from pyservicemaker import Pipeline
import sys

CONFIG_FILE_PATH = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"

if __name__ == '__main__':
    pipeline = Pipeline("sample-pipeline")
    pipeline.add("nvurisrcbin", "src", {"uri": sys.argv[1]})
    pipeline.add("nvstreammux", "mux", {"batch-size": 1, "width": 1280, "height": 720})
    pipeline.add("nvinferbin", "infer", {"config-file-path": CONFIG_FILE_PATH})
    pipeline.add("nvosdbin", "osd").add("nveglglessink", "sink")
    pipeline.link(("src", "mux"), ("", "sink_%u")).link("mux", "infer", "osd", "sink")
    pipeline.start().wait()

A functional pipeline requires appropriate elements from Deepstream plugins to be added, configured and linked correctly. This can be seamlessly achieved using Pipeline APIs in a fluent manner:

Pipeline pipeline("sample-pipeline")
// nvstreammux is the factory name in Deepstream to create a streammuxer Element
// mux is the name of the Element instance
// multiple key value pairs can be appended to configure the added Element
pipeline.add("nvstreammux", "mux", "batch-size", 1, "width", 1280, "height", 720)

The ‘add’ method is used to incorporate all necessary element nodes into a pipeline instance. This method takes the element’s registration name and node name as parameters, followed by a dictionary specifying the element’s properties. The node name given to an element during addition can be used to refer to the element within the pipeline.

For detailed insights into how each property affects the respective element, DS Plugin Overview serves as the primary and most comprehensive resource. Developers can also run gst-inspect-1.0 with element registration name to check its technical specification, e.g. after looking up the nvstreammux from the plugin manual, we know the element is for batching buffers from multiple input and it requires “batch-size”, “width” and “height” to be set.

After the element nodes are added into the pipeline, the ‘link’ method accomplishes the construction of the streaming path. This method offers two variations:

  • The simpler one accepts the names of all instances to be linked sequentially.

pipeline.link("mux", "infer", "osd", "sink")
  • The more sophisticated one links two instances, utilizing two tuples to specify the source name and target name, along with the source and target pads to indicate the specific media streams.

pipeline.link(("src", "mux"), ("", "sink_%u"))

The second ‘link’ method primarily addresses dynamic paths, such as those encountered with the ‘nvstreammux’ element. This element features a dynamic input and a template pad named “sink_%u,” which requires the use of this method to establish the appropriate connections.

To start the pipeline and wait until the stream reaches its end, the ‘start’ method and ‘wait’ method need to be called sequentially.

pipeline.start().wait()

Now let’s save the code as sample_app.py and run the application from the console:

$ python3 sample_app.py file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4

The inference is operating as expected with the provided model configuration. However, if we wish to inspect the results, additional steps are required by attaching a probe.

A probe is a predefined class utilized to intercept the processed buffer and metadata. By providing it with a suitable implementation of BatchMetadataOperator, we can extract the object detection results from the metadata generated by ‘nvinfer’.

Below is a sample implementation of a BatchMetadataOperator, which counts the objects and displays the numbers in the downstream ‘osd’ element. The ‘handle_metadata’ method is called within the pipeline on every buffer batch, with ‘batch_meta’ object wrapping the batch metadata. Developers can iterate through the ‘batch_meta’ object for frame metadata, then frame metadata for object metadata.

In the sample code, handle_metadata inspects the object information for each frame within a batch, calculates the count for each class and appends a display metadata object containing a text label to the frame. Thus, the numbers get displayed in the video output. For more details about the metadata usage, refer to Leveraging Metadata.

from pyservicemaker import BatchMetadataOperator, Probe, osd

class ObjectCounterMarker(BatchMetadataOperator):
    def handle_metadata(self, batch_meta):
        for frame_meta in batch_meta.frame_items:
            vehcle_count = 0
            person_count = 0
            for object_meta in frame_meta.object_items:
                class_id = object_meta.class_id
                if class_id == 0:
                    vehcle_count += 1
                elif class_id == 2:
                    person_count += 1
            print(f"Object Counter: Pad Idx={frame_meta.pad_index},"
                f"Frame Number={frame_meta.frame_number},"
                f"Vehicle Count={vehcle_count}, Person Count={person_count}")
            text = f"Person={person_count},Vehicle={vehcle_count}"
            display_meta = batch_meta.acquire_display_meta()
            label = osd.Text()
            label.display_text = text.encode('ascii')
            label.x_offset = 10
            label.y_offset = 12
            label.font.name = osd.FontFamily.Serif
            label.font.size = 12
            label.font.color = osd.Color(1.0, 1.0, 1.0, 1.0)
            label.set_bg_color = True
            label.bg_color = osd.Color(0.0, 0.0, 0.0, 1.0)
            display_meta.add_text(label)
            frame_meta.append(display_meta)

By attaching the above buffer probe into the inference plugin within the existing pipeline before starting it, we extract object count information from each frame of the video stream and display it both in the console output and as an overlay on the video:

pipeline.attach("infer", Probe("counter", ObjectCounterMarker()))

Now let’s run the python application again and we’ll see the object counts printed out:

Object Counter: Pad Idx=0,Frame Number=0,Vehicle Count=12, Person Count=6
Object Counter: Pad Idx=0,Frame Number=1,Vehicle Count=15, Person Count=7
Object Counter: Pad Idx=0,Frame Number=2,Vehicle Count=13, Person Count=5
Object Counter: Pad Idx=0,Frame Number=3,Vehicle Count=12, Person Count=6
Object Counter: Pad Idx=0,Frame Number=4,Vehicle Count=15, Person Count=8
Object Counter: Pad Idx=0,Frame Number=5,Vehicle Count=15, Person Count=5
Object Counter: Pad Idx=0,Frame Number=6,Vehicle Count=11, Person Count=5
Object Counter: Pad Idx=0,Frame Number=7,Vehicle Count=13, Person Count=5
Object Counter: Pad Idx=0,Frame Number=8,Vehicle Count=19, Person Count=4
Object Counter: Pad Idx=0,Frame Number=9,Vehicle Count=15, Person Count=5
Object Counter: Pad Idx=0,Frame Number=10,Vehicle Count=13, Person Count=4

In addition to creating a Probe instance from Python code, the ‘attach’ method can also attach a probe from a shared library by specifying the module name. Below code attaches a pre-built probe for displaying the object information over ‘osd’:

pipeline.attach("infer", "sample_video_probe", "my_probe")

YAML configuration files for pipeline construction are supported, following the same specification as used by C++ APIs. Below is how the above pipeline can be defined in YAML configuration:

deepstream:
  nodes:
  - type: nvurisrcbin
    name: src
    properties:
        uri: file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4
  - type: nvstreammux
    name: mux
    properties:
      batch-size: 1
      width: 1280
      height: 720
  - type: nvinferbin
    name: infer
    properties:
      config-file-path: /opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml
  - type: nvosdbin
    name: osd
  - type: nveglglessink
    name: sink
  edges:
    src: mux
    mux: infer
    infer: osd
    osd: sink

And with the YAML configuration being applied, the application source can be simplified to a single line:

Pipeline(name="sample-pipeline", config_file="my_config.yaml").start().wait()

PipelineAPI Sample Applications Reference Table#

Reference test application

Path inside service-maker/sources directory

Description

Sample test application 1

apps/python/pipeline_api/deepstream_test1_app

Sample of how to use DeepStream elements for a single H.264 stream inference using pipelineAPI: filesrc -> decode -> nvstreammux -> nvinfer or nvinferserver (primary detector) -> nvdsosd -> renderer. This app uses resnet18_trafficcamnet_pruned.onnx for detection.

Sample test application 2

apps/python/pipeline_api/deepstream_test2_app

Sample of how to use DeepStream elements for a single H.264 stream cascaded inference using pipelineAPI: filesrc -> decode -> nvstreammux -> nvinfer or nvinferserver (primary detector) -> nvtracker -> nvinfer or nvinferserver (secondary classifier) -> nvdsosd -> renderer. This app uses resnet18_trafficcamnet_pruned.onnx for detection and 2 classifier models (i.e., resnet18_vehiclemakenet_pruned.onnx, resnet18_vehicletypenet_pruned.onnx).

Sample test application 3

apps/python/pipeline_api/deepstream_test3_app

Builds on pipeline_api/deepstream_test1 (sample test application 1) to demonstrate how to:

  • Use multiple sources in the pipeline for inference.

  • Use a uridecodebin to accept any type of input (e.g. RTSP/File).

  • Configure nvstreammux to generate a batch of frames and infer on it for better resource utilization.

  • Extract the stream metadata, which contains useful information about the frames in the batched buffer.

This app uses resnet18_trafficcamnet_pruned.onnx for detection.

Sample test application 4

apps/python/pipeline_api/deepstream_test4_app

Builds on pipeline_api/deepstream_test1 for a single H.264 stream inference: filesrc, decode, nvstreammux, nvinfer or nvinferserver, nvdsosd, renderer to demonstrate how to:

  • Use the nvmsgconv and nvmsgbroker plugins in the pipeline.

  • Create NVDS_META_EVENT_MSG type metadata and attach it to the buffer.

  • Use NVDS_META_EVENT_MSG for different types of objects, e.g. vehicle and person.

This app uses resnet18_trafficcamnet_pruned.onnx for detection.

Sample test application 5

apps/python/pipeline_api/deepstream_test5_app

Builds with pipelineAPI. Demonstrates:

  • Use of nvmsgconv and nvmsgbroker plugins in the pipeline for multistream inference.

  • How to configure nvmsgbroker plugin from the config file as a sink plugin (for KAFKA, Azure, etc.).

  • How to work with a remote Kafaka server as producer and consumer.

  • Leveraging nvmultiurisrcbin for dynamic source management

This app uses resnet18_trafficcamnet_pruned.onnx for detection.

Appsrc and Appsink example

apps/python/pipeline_api/deepstream_appsrc_test_app

Demonstrates how to create a BufferProvider for a Feeder class and how to create a BufferRetriever for a receiver class. A Feeder with customized BufferProvider can be used to inject user data to the DS pipeline and a receiver with a customized BufferRetriever can be used to extract buffer data from the pipeline.

Kafka Custom Data example

apps/python/pipeline_api/deepstream_kafka_test_app

Sample of how to use DeepStream elements for a single H.264 stream inference and send custom inference data directly to a kafka server using pipelineAPI: filesrc -> decode -> nvstreammux -> nvinfer or nvinferserver (primary detector) -> nvdsosd -> renderer. This app uses resnet18_trafficcamnet_pruned.onnx for detection.