Introduction to Pipeline APIs#
For developers already familiar with the DeepStream SDK, pyservicemaker offers Pipeline APIs that enable them to fully harness the capabilities of DeepStream.
Creating a sample Deepstream application in Python using Pipeline APIs closely mirrors the process with C++ APIs, with the notable distinction that it doesn’t require a Makefile or build process.
from pyservicemaker import Pipeline
import sys
CONFIG_FILE_PATH = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
if __name__ == '__main__':
pipeline = Pipeline("sample-pipeline")
pipeline.add("nvurisrcbin", "src", {"uri": sys.argv[1]})
pipeline.add("nvstreammux", "mux", {"batch-size": 1, "width": 1280, "height": 720})
pipeline.add("nvinferbin", "infer", {"config-file-path": CONFIG_FILE_PATH})
pipeline.add("nvosdbin", "osd").add("nveglglessink", "sink")
pipeline.link(("src", "mux"), ("", "sink_%u")).link("mux", "infer", "osd", "sink")
pipeline.start().wait()
A functional pipeline requires appropriate elements from Deepstream plugins to be added, configured and linked correctly. This can be seamlessly achieved using Pipeline APIs in a fluent manner:
Pipeline pipeline("sample-pipeline")
// nvstreammux is the factory name in Deepstream to create a streammuxer Element
// mux is the name of the Element instance
// multiple key value pairs can be appended to configure the added Element
pipeline.add("nvstreammux", "mux", "batch-size", 1, "width", 1280, "height", 720)
The ‘add’ method is used to incorporate all necessary element nodes into a pipeline instance. This method takes the element’s registration name and node name as parameters, followed by a dictionary specifying the element’s properties. The node name given to an element during addition can be used to refer to the element within the pipeline.
For detailed insights into how each property affects the respective element, DS Plugin Overview serves as the primary and most comprehensive resource. Developers can also run gst-inspect-1.0 with element registration name to check its technical specification, e.g. after looking up the nvstreammux from the plugin manual, we know the element is for batching buffers from multiple input and it requires “batch-size”, “width” and “height” to be set.
After the element nodes are added into the pipeline, the ‘link’ method accomplishes the construction of the streaming path. This method offers two variations:
The simpler one accepts the names of all instances to be linked sequentially.
pipeline.link("mux", "infer", "osd", "sink")
The more sophisticated one links two instances, utilizing two tuples to specify the source name and target name, along with the source and target pads to indicate the specific media streams.
pipeline.link(("src", "mux"), ("", "sink_%u"))
The second ‘link’ method primarily addresses dynamic paths, such as those encountered with the ‘nvstreammux’ element. This element features a dynamic input and a template pad named “sink_%u,” which requires the use of this method to establish the appropriate connections.
To start the pipeline and wait until the stream reaches its end, the ‘start’ method and ‘wait’ method need to be called sequentially.
pipeline.start().wait()
Now let’s save the code as sample_app.py and run the application from the console:
$ python3 sample_app.py file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4
The inference is operating as expected with the provided model configuration. However, if we wish to inspect the results, additional steps are required by attaching a probe.
A probe is a predefined class utilized to intercept the processed buffer and metadata. By providing it with a suitable implementation of BatchMetadataOperator, we can extract the object detection results from the metadata generated by ‘nvinfer’.
Below is a sample implementation of a BatchMetadataOperator, which counts the objects and displays the numbers in the downstream ‘osd’ element. The ‘handle_metadata’ method is called within the pipeline on every buffer batch, with ‘batch_meta’ object wrapping the batch metadata. Developers can iterate through the ‘batch_meta’ object for frame metadata, then frame metadata for object metadata.
In the sample code, handle_metadata inspects the object information for each frame within a batch, calculates the count for each class and appends a display metadata object containing a text label to the frame. Thus, the numbers get displayed in the video output. For more details about the metadata usage, refer to Leveraging Metadata.
from pyservicemaker import BatchMetadataOperator, Probe, osd
class ObjectCounterMarker(BatchMetadataOperator):
def handle_metadata(self, batch_meta):
for frame_meta in batch_meta.frame_items:
vehcle_count = 0
person_count = 0
for object_meta in frame_meta.object_items:
class_id = object_meta.class_id
if class_id == 0:
vehcle_count += 1
elif class_id == 2:
person_count += 1
print(f"Object Counter: Pad Idx={frame_meta.pad_index},"
f"Frame Number={frame_meta.frame_number},"
f"Vehicle Count={vehcle_count}, Person Count={person_count}")
text = f"Person={person_count},Vehicle={vehcle_count}"
display_meta = batch_meta.acquire_display_meta()
label = osd.Text()
label.display_text = text.encode('ascii')
label.x_offset = 10
label.y_offset = 12
label.font.name = osd.FontFamily.Serif
label.font.size = 12
label.font.color = osd.Color(1.0, 1.0, 1.0, 1.0)
label.set_bg_color = True
label.bg_color = osd.Color(0.0, 0.0, 0.0, 1.0)
display_meta.add_text(label)
frame_meta.append(display_meta)
By attaching the above buffer probe into the inference plugin within the existing pipeline before starting it, we extract object count information from each frame of the video stream and display it both in the console output and as an overlay on the video:
pipeline.attach("infer", Probe("counter", ObjectCounterMarker()))
Now let’s run the python application again and we’ll see the object counts printed out:
Object Counter: Pad Idx=0,Frame Number=0,Vehicle Count=12, Person Count=6
Object Counter: Pad Idx=0,Frame Number=1,Vehicle Count=15, Person Count=7
Object Counter: Pad Idx=0,Frame Number=2,Vehicle Count=13, Person Count=5
Object Counter: Pad Idx=0,Frame Number=3,Vehicle Count=12, Person Count=6
Object Counter: Pad Idx=0,Frame Number=4,Vehicle Count=15, Person Count=8
Object Counter: Pad Idx=0,Frame Number=5,Vehicle Count=15, Person Count=5
Object Counter: Pad Idx=0,Frame Number=6,Vehicle Count=11, Person Count=5
Object Counter: Pad Idx=0,Frame Number=7,Vehicle Count=13, Person Count=5
Object Counter: Pad Idx=0,Frame Number=8,Vehicle Count=19, Person Count=4
Object Counter: Pad Idx=0,Frame Number=9,Vehicle Count=15, Person Count=5
Object Counter: Pad Idx=0,Frame Number=10,Vehicle Count=13, Person Count=4
In addition to creating a Probe instance from Python code, the ‘attach’ method can also attach a probe from a shared library by specifying the module name. Below code attaches a pre-built probe for displaying the object information over ‘osd’:
pipeline.attach("infer", "sample_video_probe", "my_probe")
YAML configuration files for pipeline construction are supported, following the same specification as used by C++ APIs. Below is how the above pipeline can be defined in YAML configuration:
deepstream:
nodes:
- type: nvurisrcbin
name: src
properties:
uri: file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4
- type: nvstreammux
name: mux
properties:
batch-size: 1
width: 1280
height: 720
- type: nvinferbin
name: infer
properties:
config-file-path: /opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml
- type: nvosdbin
name: osd
- type: nveglglessink
name: sink
edges:
src: mux
mux: infer
infer: osd
osd: sink
And with the YAML configuration being applied, the application source can be simplified to a single line:
Pipeline(name="sample-pipeline", config_file="my_config.yaml").start().wait()
PipelineAPI Sample Applications Reference Table#
Reference test application
Path inside service-maker/sources directory
Description
Sample test application 1
apps/python/pipeline_api/deepstream_test1_app
Sample of how to use DeepStream elements for a single H.264 stream inference using pipelineAPI: filesrc -> decode -> nvstreammux -> nvinfer or nvinferserver (primary detector) -> nvdsosd -> renderer. This app uses resnet18_trafficcamnet_pruned.onnx for detection.
Sample test application 2
apps/python/pipeline_api/deepstream_test2_app
Sample of how to use DeepStream elements for a single H.264 stream cascaded inference using pipelineAPI: filesrc -> decode -> nvstreammux -> nvinfer or nvinferserver (primary detector) -> nvtracker -> nvinfer or nvinferserver (secondary classifier) -> nvdsosd -> renderer. This app uses resnet18_trafficcamnet_pruned.onnx for detection and 2 classifier models (i.e., resnet18_vehiclemakenet_pruned.onnx, resnet18_vehicletypenet_pruned.onnx).
Sample test application 3
apps/python/pipeline_api/deepstream_test3_app
Builds on pipeline_api/deepstream_test1 (sample test application 1) to demonstrate how to:
Use multiple sources in the pipeline for inference.
Use a uridecodebin to accept any type of input (e.g. RTSP/File).
Configure nvstreammux to generate a batch of frames and infer on it for better resource utilization.
Extract the stream metadata, which contains useful information about the frames in the batched buffer.
This app uses resnet18_trafficcamnet_pruned.onnx for detection.
Sample test application 4
apps/python/pipeline_api/deepstream_test4_app
Builds on pipeline_api/deepstream_test1 for a single H.264 stream inference: filesrc, decode, nvstreammux, nvinfer or nvinferserver, nvdsosd, renderer to demonstrate how to:
Use the nvmsgconv and nvmsgbroker plugins in the pipeline.
Create NVDS_META_EVENT_MSG type metadata and attach it to the buffer.
Use NVDS_META_EVENT_MSG for different types of objects, e.g. vehicle and person.
This app uses resnet18_trafficcamnet_pruned.onnx for detection.
Sample test application 5
apps/python/pipeline_api/deepstream_test5_app
Builds with pipelineAPI. Demonstrates:
Use of nvmsgconv and nvmsgbroker plugins in the pipeline for multistream inference.
How to configure nvmsgbroker plugin from the config file as a sink plugin (for KAFKA, Azure, etc.).
How to work with a remote Kafaka server as producer and consumer.
Leveraging nvmultiurisrcbin for dynamic source management
This app uses resnet18_trafficcamnet_pruned.onnx for detection.
Appsrc and Appsink example
apps/python/pipeline_api/deepstream_appsrc_test_app
Demonstrates how to create a BufferProvider for a Feeder class and how to create a BufferRetriever for a receiver class. A Feeder with customized BufferProvider can be used to inject user data to the DS pipeline and a receiver with a customized BufferRetriever can be used to extract buffer data from the pipeline.