Advanced Features#

The pyservicemaker provides advanced APIs/utilities for developers to unlock the full potential of DeepStream SDK.

Handling Buffers#

A Buffer object in Service Maker represents a batch of data along with corresponding metadata. It provides two properties, batch_size and batch_meta, for developers to access the data and metadata of a batch.

Buffer objects are commonly passed either through BufferOperator or BufferRetriever interfaces, both of which are designed to be implemented within the application to handle the data from buffers within the pipeline. One difference is BufferOperator interface is required by a Probe and BufferRetriever by a Receiver. While it is technically possible to copy and process buffers within a probe, that’s generally not recommended. Probes are primarily intended for inspection purposes, and performing time-consuming tasks within them should be avoided. This is because probes are typically executed within the critical path of the pipeline, and any delays introduced by extensive processing could increase the overall latency. The other difference is BufferOperator only allows in-place operation on the Buffer data, and the result will be passed downstream and used by other nodes within the same pipeline, however, BufferRetriever is supposed to consume the buffer at the end of the pipeline.

Below is a sample implementation of BufferOperator for skipping frames. If the handle_buffer returns ‘False’, the current buffer will be dropped.

from pyservicemaker import BufferOperator, Probe

class FrameSkipper(BufferOperator):
    def __init__(self):
        super().__init__()
        self._frames = 0

    def handle_buffer(self, buffer):
        self._frames += 1
        return (self._frames%2) != 0

Once we try applying the frame skipper to above sample application as follows, we’ll notice the downstream elements only get one frame in two.

pipeline.attach("mux", Probe("probe", FrameSkipper()))

For performing heavy operations on buffers, the preferred approach is to use BufferRetriever. By replacing the ‘nveglglessink’ with ‘appsink’ in the above sample and attaching a Receiver with a BufferRetriever implementation, we can access buffer in its ‘consume’ method. The most convenient way to access buffer data is to extract it into a tensor - a dlpack-compatible class designed to facilitate interoperability with other Python deep learning frameworks.

from pyservicemaker import BufferRetriever, Receiver

class MyBufferRetriever(BufferRetriever):
    def consume(self, buffer):
        # extract the data from the first buffer in a batch
        tensor = buffer.extract(0)
pipeline.attach("sink", Receiver("receiver", MyBufferRetriever(queue)), tips="new-sample")

A Service Maker Tensor object can by casted to corresponding tensors in any dlpack-compatible frameworks, e.g. pytorch:

from pyservicemaker import BufferRetriever, Receiver
import torch

class MyBufferRetriever(BufferRetriever):
    def consume(self, buffer):
        tensor = buffer.extract(0)
        torch_tensor = torch.utils.dlpack.from_dlpack(tensor.clone())

Note

If developers want to preserve the tensor data for later use, they must use clone() method to duplicate the tensor, as the BufferRetriever is at the end of the pipeline and the buffer will be released after consume() returns.

A buffer object can be created via a tensor as well. This approach is commonly used in the implementation of BufferProvider, which works with the Feeder class and ‘appsrc’ to inject data into the pipeline. Below sample demonstrates how to load a pytorch RGB tensor and inject it into the pipeline.

from pyservicemaker import BufferProvider, ColorFormat, as_tensor
import torch

class MyBufferProvider(BufferProvider):
    def generate(self, size):
        torch_tensor = torch.load('tensor_data.pt')
        ds_tensor = as_tensor(torch_tensor, "HWC")
        return ds_tensor.wrap(ColorFormat.RGB)

By replacing the ‘urisrcbin’ with ‘appsrc’ in the original sample code, MyBufferProvider can be incorporated into the pipeline:

pipeline.add("appsrc", "src", {"caps": f"video/x-raw(memory:NVMM), format=RGB, width={width}, height={height}, framerate=30/1", "do-timestamp": True})

Moreover, a buffer can be created via a python byte list too, for example, mono pictures in black are generated from below code:

from pyservicemaker import BufferProvider, Buffer

class MyBufferProvider(BufferProvider):

    def generate(self, size):
        data = [0]*(320*240*3)
        return Buffer(data)

Leveraging Metadata#

Metadata provides additional information about a buffer in Deepstream Service Maker. A BatchMetadata object, which can be retrieved from a buffer, serves as the root for all other metadata objects. Batch metadata manages a metadata pool and provides methods for initializing a variety of metadata objects.

The BatchMetadata object contains metadata for a batch of frames within a buffer. It carries the batching information such as size of the batch, number of frames in the batch, and also provides access to a list of FrameMetadata objects through its frame_items attribute.

The FrameMetadata objects encapsulate metadata for individual frames in the buffer batch. These frames can originate from different input streams. Through FrameMetadata, developers can access various pieces of information, including dimensions, timestamps, frame number, etc. Moreover, frame metadata carries other metadata on the frame level, including ObjectMetadata, DisplayMetadata and various user metadata.

The ObjectMetadata objects carry the results of object detection. These metadata objects are typically created by an object detection model within the ‘nvinfer’ plugin. The ObjectMetadata can then be utilized by the ‘osd (On-Screen Display)’ plugin for graphical representation. By leveraging ObjectMetadata, developers can visualize detected objects on the frame. When using customized models, developers can create their own ObjectMetadata from the model output for visualization.

The TensorOutputUserMetadata is a customized user metadata that carries the output of an inference model. When the output-tensor-meta property in the nvinfer plugin is set to True, TensorOutputUserMetadata objects will be attached to the frame metadata. This allows developers to have direct access to the output tensors of a model. By using TensorOutputUserMetadata, developers can directly work with the inference results, enabling advanced processing and analysis of model outputs.

The EventMessageUserMetadata is another type of user metadata that transforms object detection information into message objects. These message metadata are utilized by the nvmsgconv plugin to post messages to various IoT servers. By employing EventMessageUserMetadata, developers can facilitate the communication of detection events to external systems, enhancing the integration of Deepstream Service Maker with IoT infrastructures.

For the details of metadata structure in Deepstream, users should refer to the document.

Common Factory#

Service maker python binding supports creating custom object directly from a service maker plugin.

from pyservicemaker import CommonFactory

# create a smart recording controller instance from "smart_recording_action" and name it "sr_controller"
CommonFactory.create("smart_recording_action", "sr_controller")

The feature usage has been demonstrated in service-maker/sources/apps/python/pipeline_api/deepstream_test5_app application.

Pipeline Messages#

While the pipeline is running, various messages are generated to inform the application about its status. Service Maker supports the ‘on_message’ parameter in the ‘start’ method, providing developers with an opportunity to inspect and respond to the messages they’re interested in.

The on_message callable takes two parameters: the current pipeline and the message received.

Service maker offers the bindings for two most important messages:

StateTransitionMessage indicates a state transition within the pipeline:

name

description

old_state

the last state of the transition

new_state

the current state of the transition

origin

name of node triggering the message

DynamicSourceMessage indicates source is added or removed via REST APIs. The message could be generated in case ‘nvmultiurisrcbin’ is used.

name

description

source_added

add or removal

source_id

index of the source

sensor_id

unique string identifying the sensor

sensor_name

a meaningful name of the sensor

uri

URI of the sensor

The feature usage has been demonstrated in service-maker/sources/apps/python/pipeline_api/deepstream_test5_app application.

OSD#

Service maker provides a submodule ‘osd’ for advanced display control with basic graphical gadgets.

  • Text: a formatted string

  • Rect: a colorful rectangle

  • Line: straight line

  • Arrow: arrows

  • Circle: circles

  • EventHandler: a pre-built class to respond on mouse events

The feature usage has been demonstrated in service-maker/sources/apps/python/flow_api/deepstream_test1_app application.

Engine File Monitor#

The EngineFileMonitor is defined in ‘utils’ submodule and can be used to monitor a model engine file, whenever it is updated the inference will restart without disrupting the whole pipeline.

The feature usage has been demonstrated in service-maker/sources/apps/python/pipeline_api/deepstream_test5_app application.

Performance Monitor#

The PerfMonitor is defined in ‘utils’ submodule and can be used to monitor the framerate in realtime. Before a PerfMonitor takes effect, it must be applied on a specific node within the pipeline. When used with ‘nvmultiurisrcbin’, new source needs be registered with PerfMonitor using ‘add_stream’, and after a source is removed, ‘remove_stream’ of PerfMonitor must be called accordingly.

The feature usage has been demonstrated in service-maker/sources/apps/python/pipeline_api/deepstream_test5_app application.

MediaInfo#

MediaInfo object can be used to retrieve the information of a media source.

from pyservicemaker import utils

mediainfo = utils.MediaInfo.discover("sample.mp4")

MediaExtractor#

MediaExtractor is a convenient class provided in utils module for extracting video frames from multiple media source. Each media source is defined by a MediaChunk object with ‘source’, ‘start_pts’ and ‘duration’. MediaExtractor is a callable and must be constructed with a list of input MediaChunk, once invoked it returns a list of Queue object from which decoded frames of each media chunk can be retrieved. From the decoded video frame, developers can access the data from its tensor attribute and timestamp from its ‘timestamp’ attribute.

Below code snippet shows how to retrieve frames from media sources and is only applicable for x86 system:

from pyservicemaker import BufferProvider, Buffer, Flow, Pipeline, ColorFormat
from pyservicemaker.utils import MediaExtractor, MediaChunk
from concurrent.futures import ThreadPoolExecutor
import queue

VIDEO_FILE = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4"
N_CHUNKS = 8

class MyBufferProvider(BufferProvider):
    def __init__(self, queue, width, height):
        print("MyBufferProvider")
        super().__init__()
        self._queue = queue
        self.format = "RGB"
        self.width = width
        self.height = height
        self.framerate = 30
        self.device = 'gpu'
        self.frames = 0

    def generate(self, size):
        try:
            frame = self._queue.get(timeout=2)
            self.frames += 1
            return frame.tensor.wrap(ColorFormat.RGB)
        except queue.Empty:
            print("Buffer empty")
            return Buffer()

chunks = [MediaChunk(f) for f in [VIDEO_FILE]*N_CHUNKS]
qs = MediaExtractor(chunks=chunks, batch_size=N_CHUNKS)()
with ThreadPoolExecutor(max_workers=N_CHUNKS) as exe:
    exe.map(lambda q: Flow(Pipeline("renderer")).inject([MyBufferProvider(q, 1280, 720)]).render(sync=False)(), qs)