Quick Start Guide#
Getting to know Pipeline and Flow#
The first thing users need to know for using pyservicemaker is pipeline. A pipeline consists of various processing nodes linked together to process or manipulate data flows. Each node instance performs a specific function, such as capturing data, decoding, processing, or rendering it. These nodes are connected in a chain, creating a pipeline through which the data flows.
A simple empty pipeline can be created with a given name as follows:
from pyservicemaker import Pipeline
pipeline = Pipeline("sample-pipeline")
So far the pipeline can not do anything. To make it functional developers can define data flows for the pipeline by creating a flow for a pipeline. Flow APIs provide developers with the flexibility to append operations based on their specific goals subsequently. This approach allows for a modular and customizable workflow where operations can be added as needed. Every time a Flow method is called, it takes the intended output streams from the last flow and assume them as the inputs of current flow. Thus, the order of Flow methods determines how a flow works when the pipeline is started.
Create a Simple Video Player Application#
The code snippet below demonstrates how to create a simple flow to decode and display a video file.
from pyservicemaker import Pipeline, Flow
pipeline = Pipeline("playback")
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(pipeline).capture([video_file]).render()()
Once the flow is complete, it can be invoked with ‘()’ because it is callable. This invocation keeps the pipeline running until the processing is finished. Additionally, developers can choose to call Pipeline.start() and Pipeline.wait(). The code snippet below is effectively equivalent to the above one.
from pyservicemaker import Pipeline, Flow
pipeline = Pipeline("playback")
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(pipeline).capture([video_file]).render()
pipeline.start()
pipeline.wait()
Create an Object Detection Application#
A more advanced pipeline for object detection built on top of Sample Video Player Application can be constructed using the infer method for inference as follows:
from pyservicemaker import Pipeline, Flow
pipeline = Pipeline("detector")
infer_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(pipeline).batch_capture([video_file]).infer(infer_config).render()()
The application executes an object detection model based on ResNet18 on the captured video. The model’s configuration is specified in the infer_config, and detected objects are automatically converted into bounding boxes, which are overlaid on the display.
Customizing Object Detection Sample Application#
In many use cases, developers require direct access to inference results for various analyses. To achieve this, the application can be enhanced by attaching a probe to the inference result (For deeper understanding of a probe, please refer to Handling Buffers). This probe allows developers to define custom metadata operators that extract output tensors from the metadata associated with the output buffers. (For more details on metadata, please refer to Leveraging Metadata.)
Below is the enhanced version of the above code snippet.
from pyservicemaker import Pipeline, Flow, Probe, BatchMetadataOperator
import torch
class TensorOutput(BatchMetadataOperator):
def handle_metadata(self, batch_meta):
for frame_meta in batch_meta.frame_items:
for user_meta in frame_meta.tensor_items:
for n, tensor in user_meta.as_tensor_output().get_layers().items():
print(f"tensor name: {n}")
print(f"tensor object: {tensor}")
# operations on tensors:
torch_tensor = torch.utils.dlpack.from_dlpack(tensor.clone())
pipeline = Pipeline("detector")
infer_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
probe = Probe('tensor_retriver', TensorOutput())
Flow(pipeline).batch_capture([video_file]).infer(infer_config, output_tensor_meta=True).attach(probe).render()()
The enhanced code retrieves inference result as tensors from user metadata associated with the output buffer from infer flow. This is done through a probe that utilizes a customized BatchMetadataOperator to extract and process the metadata.
Getting to know Buffer and Tensor#
In pyservicemaker, a Buffer represents a chunk of data that flows through the pipeline from one node to another. It contains the actual media data, such as a segment of audio or video, and is passed between nodes for processing or output. In some cases, developers would like to create their own buffers and inject it to the pipeline or consume the buffers from the pipeline. pyservicemaker provides two abstract classes for them to achieved such goals.
BufferProvider#
BufferProvider specifies an interface for developers to implement, so to generate buffers. The code snippet below demonstrates how to generate buffers from bytes with a sub-class of BufferProvider (For demonstration, the method generates grey pictures):
from pyservicemaker import BufferProvider, Buffer
class MyBufferProvider(BufferProvider):
def __init__(self, width, height, device='cpu', framerate=30, format="RGB"):
super().__init__()
self.width = width
self.height = height
self.format = format
self.framerate = framerate
self.device = device
def generate(self, size):
data = [128]*(self.width*self.height*3)
return Buffer() if self.count == self.expected else Buffer(data)
Extra information is recommended for the pipeline to better understand what the data in the buffer represents, such as format, width, height, etc.
Developers can create an empty buffer without any argument, and injecting an empty buffer to the pipeline will end it.
To inject buffers to pipeline, developers can use inject flow with their customized BufferProvider. The code snippet below shows how to inject video buffers into a pipeline and display it:
Flow(Pipeline("playback")).inject([MyBufferProvider(640, 480)]).render()()
BufferRetriever#
BufferRetriever specifies a customizable interface for developers to consume the buffers. It must be used with the retrieve flow. The code snippet below shows how to get buffer data from the pipeline.
# Read the decoded video buffers from a sample mp4 file
from pyservicemaker import Pipeline, Flow, BufferRetriever
class MyBufferRetriever(BufferRetriever):
def consume(self, buffer):
tensor = buffer.extract(0)
return 1
To utilize the buffer data, developers must extract it into a tensor. If the data is batched, multiple tensors may be extracted, requiring the use of a batch ID. If there is no batching, 0 should always be passed to extract method. Sub class from BufferRetriever can be used together with the retrieve flow to get buffer from the pipeline:
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("retrieve")).capture([video_file]).retrieve(MyBufferRetriever())()
Tensor#
Tensor is essentially a multidimensional array that is widely used in deep learning space. The pyservicemaker Tensor facilitates data handling on the GPU and offers seamless interoperability with other frameworks through its compatibility with DLPack.
A common practice for re-using data from a pyservicemaker Tensor is to clone the tensor and pass it to a PyTorch Tensor. This ensures that the original data remains unchanged while enabling efficient integration with other frameworks for further processing. The code snippet below shows how to transform a pyservicemaker Tensor to a pytorch tensor
torch_tensor = torch.utils.dlpack.from_dlpack(tensor.clone())
Similarly, developers also can create pyservicemaker Tensors from pytorch tensor, and then wrap it into a buffer. The code snippet below demonstrates how to generate buffers from pytorch tensors within a customized BufferProvider, which can be later used by inject flow.
from pyservicemaker import BufferProvider, ColorFormat, as_tensor
import torch
class MyBufferProvider(BufferProvider):
def generate(self, size):
torch_tensor = torch.load('tensor_data.pt')
ds_tensor = as_tensor(torch_tensor, "HWC")
return ds_tensor.wrap(ColorFormat.RGB)