nemo_rl.models.automodel.data#
Data processing utilities for automodel training and inference.
Module Contents#
Classes#
Processed microbatch inputs ready for model forward pass. |
|
Container for a processed microbatch ready for model forward pass. |
Functions#
Wrap a raw microbatch iterator to yield processed microbatches. |
|
Create processed microbatch iterator based on batching strategy. |
|
Process a microbatch and prepare inputs for model forward. |
|
Process a global batch and compute normalization factors. |
API#
- class nemo_rl.models.automodel.data.ProcessedInputs#
Processed microbatch inputs ready for model forward pass.
This structure contains all necessary tensors and metadata for a forward pass, including context parallel buffers and flash attention configuration.
- input_ids: torch.Tensor#
None
- seq_len: int#
None
- attention_mask: Optional[torch.Tensor]#
None
- position_ids: Optional[torch.Tensor]#
None
- flash_attn_kwargs: dict[str, Any]#
‘field(…)’
- vlm_kwargs: dict[str, Any]#
‘field(…)’
- cp_buffers: list[torch.Tensor]#
‘field(…)’
- seq_index: Optional[torch.Tensor]#
None
- property has_context_parallel: bool#
Check if context parallel is enabled.
- property has_flash_attention: bool#
Check if flash attention is configured.
Works for both empty dict {} and dataclass objects like FlashAttnKwargs.
- property is_multimodal: bool#
Check if this is a multimodal input.
- class nemo_rl.models.automodel.data.ProcessedMicrobatch#
Container for a processed microbatch ready for model forward pass.
This dataclass holds both the original data dictionary and the processed tensors needed for the automodel forward pass. It follows the same pattern as nemo_rl/models/megatron/data.py ProcessedMicrobatch.
.. attribute:: data_dict
The original BatchedDataDict containing raw batch data
.. attribute:: processed_inputs
ProcessedInputs containing all tensors for forward pass
.. attribute:: original_batch_size
Original batch size before any packing
.. attribute:: original_seq_len
Original sequence length before any packing
- data_dict: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any]#
None
- processed_inputs: nemo_rl.models.automodel.data.ProcessedInputs#
None
- original_batch_size: int#
None
- original_seq_len: int#
None
- nemo_rl.models.automodel.data.make_processed_microbatch_iterator(
- raw_iterator: Iterator[nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any]],
- tokenizer: transformers.AutoTokenizer,
- cfg: dict[str, Any],
- cp_size: int,
Wrap a raw microbatch iterator to yield processed microbatches.
This function takes a raw iterator that yields BatchedDataDict objects and wraps it to yield ProcessedMicrobatch objects that contain both the original data and the processed tensors ready for model forward pass.
- Parameters:
raw_iterator – Iterator yielding raw BatchedDataDict microbatches
tokenizer – Tokenizer for processing
cfg – Configuration dictionary (enable_seq_packing is inferred from cfg[“sequence_packing”][“enabled”])
cp_size – Context parallel size
- Yields:
ProcessedMicrobatch objects containing processed tensors ready for model forward
- nemo_rl.models.automodel.data.get_microbatch_iterator(
- data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
- cfg: dict[str, Any],
- mbs: int,
- dp_mesh: Any,
- tokenizer: transformers.AutoTokenizer,
- cp_size: int = 1,
Create processed microbatch iterator based on batching strategy.
- Parameters:
data – Full dataset to iterate over
cfg – Configuration dictionary (enable_seq_packing is inferred from cfg[“sequence_packing”][“enabled”])
mbs – Microbatch size
dp_mesh – Data parallel mesh
tokenizer – Tokenizer for processing
cp_size – Context parallel size
- Returns:
Tuple of (processed_microbatch_iterator, iterator_length)
- nemo_rl.models.automodel.data.process_microbatch(
- mb: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
- tokenizer: transformers.AutoTokenizer,
- enable_seq_packing: bool,
- cfg: dict[str, Any],
- cp_size: int,
Process a microbatch and prepare inputs for model forward.
- Parameters:
mb – Microbatch data
tokenizer – Tokenizer for padding value
enable_seq_packing – Whether sequence packing is enabled
cfg – Configuration dictionary
cp_size – Context parallel size
- Returns:
ProcessedInputs containing all tensors and metadata for forward pass
- nemo_rl.models.automodel.data.process_global_batch(
- data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
- loss_fn: nemo_rl.algorithms.interfaces.LossFunction,
- dp_group: torch.distributed.ProcessGroup,
- *,
- batch_idx: int,
- batch_size: int,
Process a global batch and compute normalization factors.
- Parameters:
data – Full dataset
loss_fn – Loss function (used to check loss type)
dp_group – Data parallel process group (for consistency with Megatron naming)
batch_idx – Index of batch to extract
batch_size – Size of batch to extract
- Returns:
batch: The extracted batch
global_valid_seqs: Number of valid sequences across all ranks
global_valid_toks: Number of valid tokens across all ranks
- Return type:
Dictionary containing