Advanced Features#
The pyservicemaker provides advanced APIs/utilities for developers to unlock the full potential of DeepStream SDK.
Handling Buffers#
A Buffer object in Service Maker represents a batch of data along with corresponding metadata. It provides two properties, batch_size and batch_meta, for developers to access the data and metadata of a batch.
Buffer objects are commonly passed either through BufferOperator or BufferRetriever interfaces, both of which are designed to be implemented within the application to handle the data from buffers within the pipeline. One difference is BufferOperator interface is required by a Probe and BufferRetriever by a Receiver. While it is technically possible to copy and process buffers within a probe, that’s generally not recommended. Probes are primarily intended for inspection purposes, and performing time-consuming tasks within them should be avoided. This is because probes are typically executed within the critical path of the pipeline, and any delays introduced by extensive processing could increase the overall latency. The other difference is BufferOperator only allows in-place operation on the Buffer data, and the result will be passed downstream and used by other nodes within the same pipeline, however, BufferRetriever is supposed to consume the buffer at the end of the pipeline.
Below is a sample implementation of BufferOperator for skipping frames. If the handle_buffer returns ‘False’, the current buffer will be dropped.
from pyservicemaker import BufferOperator, Probe
class FrameSkipper(BufferOperator):
def __init__(self):
super().__init__()
self._frames = 0
def handle_buffer(self, buffer):
self._frames += 1
return (self._frames%2) != 0
Once we try applying the frame skipper to above sample application as follows, we’ll notice the downstream elements only get one frame in two.
pipeline.attach("mux", Probe("probe", FrameSkipper()))
For performing heavy operations on buffers, the preferred approach is to use BufferRetriever. By replacing the ‘nveglglessink’ with ‘appsink’ in the above sample and attaching a Receiver with a BufferRetriever implementation, we can access buffer in its ‘consume’ method. The most convenient way to access buffer data is to extract it into a tensor - a dlpack-compatible class designed to facilitate interoperability with other Python deep learning frameworks.
from pyservicemaker import BufferRetriever, Receiver
class MyBufferRetriever(BufferRetriever):
def consume(self, buffer):
# extract the data from the first buffer in a batch
tensor = buffer.extract(0)
pipeline.attach("sink", Receiver("receiver", MyBufferRetriever(queue)), tips="new-sample")
A Service Maker Tensor object can by casted to corresponding tensors in any dlpack-compatible frameworks, e.g. pytorch:
from pyservicemaker import BufferRetriever, Receiver
import torch
class MyBufferRetriever(BufferRetriever):
def consume(self, buffer):
tensor = buffer.extract(0)
torch_tensor = torch.utils.dlpack.from_dlpack(tensor.clone())
Note
If developers want to preserve the tensor data for later use, they must use clone() method to duplicate the tensor, as the BufferRetriever is at the end of the pipeline and the buffer will be released after consume() returns.
A buffer object can be created via a tensor as well. This approach is commonly used in the implementation of BufferProvider, which works with the Feeder class and ‘appsrc’ to inject data into the pipeline. Below sample demonstrates how to load a pytorch RGB tensor and inject it into the pipeline.
from pyservicemaker import BufferProvider, ColorFormat, as_tensor
import torch
class MyBufferProvider(BufferProvider):
def generate(self, size):
torch_tensor = torch.load('tensor_data.pt')
ds_tensor = as_tensor(torch_tensor, "HWC")
return ds_tensor.wrap(ColorFormat.RGB)
By replacing the ‘urisrcbin’ with ‘appsrc’ in the original sample code, MyBufferProvider can be incorporated into the pipeline:
pipeline.add("appsrc", "src", {"caps": f"video/x-raw(memory:NVMM), format=RGB, width={width}, height={height}, framerate=30/1", "do-timestamp": True})
Moreover, a buffer can be created via a python byte list too, for example, mono pictures in black are generated from below code:
from pyservicemaker import BufferProvider, Buffer
class MyBufferProvider(BufferProvider):
def generate(self, size):
data = [0]*(320*240*3)
return Buffer(data)
Leveraging Metadata#
Metadata provides additional information about a buffer in Deepstream Service Maker. A BatchMetadata object, which can be retrieved from a buffer, serves as the root for all other metadata objects. Batch metadata manages a metadata pool and provides methods for initializing a variety of metadata objects.
The BatchMetadata object contains metadata for a batch of frames within a buffer. It carries the batching information such as size of the batch, number of frames in the batch, and also provides access to a list of FrameMetadata objects through its frame_items attribute.
The FrameMetadata objects encapsulate metadata for individual frames in the buffer batch. These frames can originate from different input streams. Through FrameMetadata, developers can access various pieces of information, including dimensions, timestamps, frame number, etc. Moreover, frame metadata carries other metadata on the frame level, including ObjectMetadata, DisplayMetadata and various user metadata.
The ObjectMetadata objects carry the results of object detection. These metadata objects are typically created by an object detection model within the ‘nvinfer’ plugin. The ObjectMetadata can then be utilized by the ‘osd (On-Screen Display)’ plugin for graphical representation. By leveraging ObjectMetadata, developers can visualize detected objects on the frame. When using customized models, developers can create their own ObjectMetadata from the model output for visualization.
The TensorOutputUserMetadata is a customized user metadata that carries the output of an inference model. When the output-tensor-meta property in the nvinfer plugin is set to True, TensorOutputUserMetadata objects will be attached to the frame metadata. This allows developers to have direct access to the output tensors of a model. By using TensorOutputUserMetadata, developers can directly work with the inference results, enabling advanced processing and analysis of model outputs.
The EventMessageUserMetadata is another type of user metadata that transforms object detection information into message objects. These message metadata are utilized by the nvmsgconv plugin to post messages to various IoT servers. By employing EventMessageUserMetadata, developers can facilitate the communication of detection events to external systems, enhancing the integration of Deepstream Service Maker with IoT infrastructures.
For the details of metadata structure in Deepstream, users should refer to the document.
Common Factory#
Service maker python binding supports creating custom object directly from a service maker plugin.
from pyservicemaker import CommonFactory
# create a smart recording controller instance from "smart_recording_action" and name it "sr_controller"
CommonFactory.create("smart_recording_action", "sr_controller")
The feature usage has been demonstrated in service-maker/sources/apps/python/pipeline_api/deepstream_test5_app application.
Pipeline Messages#
While the pipeline is running, various messages are generated to inform the application about its status. Service Maker supports the ‘on_message’ parameter in the ‘start’ method, providing developers with an opportunity to inspect and respond to the messages they’re interested in.
The on_message callable takes two parameters: the current pipeline and the message received.
Service maker offers the bindings for two most important messages:
StateTransitionMessage indicates a state transition within the pipeline:
name |
description |
---|---|
old_state |
the last state of the transition |
new_state |
the current state of the transition |
origin |
name of node triggering the message |
DynamicSourceMessage indicates source is added or removed via REST APIs. The message could be generated in case ‘nvmultiurisrcbin’ is used.
name |
description |
---|---|
source_added |
add or removal |
source_id |
index of the source |
sensor_id |
unique string identifying the sensor |
sensor_name |
a meaningful name of the sensor |
uri |
URI of the sensor |
The feature usage has been demonstrated in service-maker/sources/apps/python/pipeline_api/deepstream_test5_app application.
OSD#
Service maker provides a submodule ‘osd’ for advanced display control with basic graphical gadgets.
Text: a formatted string
Rect: a colorful rectangle
Line: straight line
Arrow: arrows
Circle: circles
EventHandler: a pre-built class to respond on mouse events
The feature usage has been demonstrated in service-maker/sources/apps/python/flow_api/deepstream_test1_app application.
Engine File Monitor#
The EngineFileMonitor is defined in ‘utils’ submodule and can be used to monitor a model engine file, whenever it is updated the inference will restart without disrupting the whole pipeline.
The feature usage has been demonstrated in service-maker/sources/apps/python/pipeline_api/deepstream_test5_app application.
Performance Monitor#
The PerfMonitor is defined in ‘utils’ submodule and can be used to monitor the framerate in realtime. Before a PerfMonitor takes effect, it must be applied on a specific node within the pipeline. When used with ‘nvmultiurisrcbin’, new source needs be registered with PerfMonitor using ‘add_stream’, and after a source is removed, ‘remove_stream’ of PerfMonitor must be called accordingly.
The feature usage has been demonstrated in service-maker/sources/apps/python/pipeline_api/deepstream_test5_app application.
MediaInfo#
MediaInfo object can be used to retrieve the information of a media source.
from pyservicemaker import utils
mediainfo = utils.MediaInfo.discover("sample.mp4")
MediaExtractor#
MediaExtractor is a convenient class provided in utils module for extracting video frames from multiple media source. Each media source is defined by a MediaChunk object with ‘source’, ‘start_pts’ and ‘duration’. MediaExtractor is a callable and must be constructed with a list of input MediaChunk, once invoked it returns a list of Queue object from which decoded frames of each media chunk can be retrieved. From the decoded video frame, developers can access the data from its tensor attribute and timestamp from its ‘timestamp’ attribute.
Below code snippet shows how to retrieve frames from media sources and is only applicable for x86 system:
from pyservicemaker import BufferProvider, Buffer, Flow, Pipeline, ColorFormat
from pyservicemaker.utils import MediaExtractor, MediaChunk
from concurrent.futures import ThreadPoolExecutor
import queue
VIDEO_FILE = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4"
N_CHUNKS = 8
class MyBufferProvider(BufferProvider):
def __init__(self, queue, width, height):
print("MyBufferProvider")
super().__init__()
self._queue = queue
self.format = "RGB"
self.width = width
self.height = height
self.framerate = 30
self.device = 'gpu'
self.frames = 0
def generate(self, size):
try:
frame = self._queue.get(timeout=2)
self.frames += 1
return frame.tensor.wrap(ColorFormat.RGB)
except queue.Empty:
print("Buffer empty")
return Buffer()
chunks = [MediaChunk(f) for f in [VIDEO_FILE]*N_CHUNKS]
qs = MediaExtractor(chunks=chunks, batch_size=N_CHUNKS)()
with ThreadPoolExecutor(max_workers=N_CHUNKS) as exe:
exe.map(lambda q: Flow(Pipeline("renderer")).inject([MyBufferProvider(q, 1280, 720)]).render(sync=False)(), qs)
Prepare and Activate#
For specific use cases, we have introduced alternative APIs to manage pipeline execution:
Pipeline.prepare()#
Unlike pipeline.start(), which transitions the pipeline’s state to playing in a new thread, pipeline.prepare() sets the pipeline’s state to paused within the same thread. This is particularly useful in multi-pipeline scenarios where each pipeline’s state needs to be changed to paused sequentially.
Pipeline.activate()#
After using pipeline.prepare(), pipeline.activate() transitions the pipeline’s state from paused to playing in a new thread. This effectively starts the pipeline. In multi-pipeline scenarios, this allows all pipelines to begin execution concurrently.
Pipeline.wait()#
This API is used to join the threads created by pipeline.activate(), ensuring that all threads complete their execution before proceeding.
The Sample Test 5 Application demonstrates how to utilize these API calls effectively.
State Transitions in Service Maker Pipelines#
Managing state transitions in service pipelines is essential for ensuring efficient and orderly operations. Service maker facilitates synchronous state transitions, enabling smooth and predictable changes in the pipeline’s status. Here’s a clearer and more structured explanation of these transitions:
Start#
Transition: From idle to running.
Process: Utilize the pipeline.start() API to initiate the pipeline, moving it from an idle state to actively running tasks.
Prepare and Activate#
Transition: From idle to paused, then to running.
Process: By executing pipeline.prepare() followed by pipeline.activate(), the pipeline transitions through a paused state before becoming fully operational. This sequence allows for any necessary setup or configuration prior to execution.
Wait#
Purpose: Suspend the current thread until the pipeline completes its execution.
Process: The pipeline.wait() function is required after both the start and prepare/activate sequences to ensure that the current thread pauses until the pipeline has finished processing.
Stop#
Transition: Back to idle.
Process: The pipeline.stop() API is used to return the pipeline to its idle state, effectively halting operations and resetting it for future tasks.
These transitions are designed to manage workflow efficiently, allowing for both immediate execution and necessary preparatory phases.