nemo_rl.models.automodel.data#

Data processing utilities for automodel training and inference.

Module Contents#

Classes#

ProcessedInputs

Processed microbatch inputs ready for model forward pass.

ProcessedMicrobatch

Container for a processed microbatch ready for model forward pass.

Functions#

make_processed_microbatch_iterator

Wrap a raw microbatch iterator to yield processed microbatches.

get_microbatch_iterator

Create processed microbatch iterator based on batching strategy.

process_microbatch

Process a microbatch and prepare inputs for model forward.

process_global_batch

Process a global batch and compute normalization factors.

API#

class nemo_rl.models.automodel.data.ProcessedInputs#

Processed microbatch inputs ready for model forward pass.

This structure contains all necessary tensors and metadata for a forward pass, including context parallel buffers and flash attention configuration.

input_ids: torch.Tensor#

None

seq_len: int#

None

attention_mask: Optional[torch.Tensor]#

None

position_ids: Optional[torch.Tensor]#

None

flash_attn_kwargs: dict[str, Any]#

‘field(…)’

vlm_kwargs: dict[str, Any]#

‘field(…)’

cp_buffers: list[torch.Tensor]#

‘field(…)’

seq_index: Optional[torch.Tensor]#

None

property has_context_parallel: bool#

Check if context parallel is enabled.

property has_flash_attention: bool#

Check if flash attention is configured.

Works for both empty dict {} and dataclass objects like FlashAttnKwargs.

property is_multimodal: bool#

Check if this is a multimodal input.

class nemo_rl.models.automodel.data.ProcessedMicrobatch#

Container for a processed microbatch ready for model forward pass.

This dataclass holds both the original data dictionary and the processed tensors needed for the automodel forward pass. It follows the same pattern as nemo_rl/models/megatron/data.py ProcessedMicrobatch.

.. attribute:: data_dict

The original BatchedDataDict containing raw batch data

.. attribute:: processed_inputs

ProcessedInputs containing all tensors for forward pass

.. attribute:: original_batch_size

Original batch size before any packing

.. attribute:: original_seq_len

Original sequence length before any packing

data_dict: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any]#

None

processed_inputs: nemo_rl.models.automodel.data.ProcessedInputs#

None

original_batch_size: int#

None

original_seq_len: int#

None

nemo_rl.models.automodel.data.make_processed_microbatch_iterator(
raw_iterator: Iterator[nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any]],
tokenizer: transformers.AutoTokenizer,
cfg: dict[str, Any],
cp_size: int,
) Iterator[nemo_rl.models.automodel.data.ProcessedMicrobatch]#

Wrap a raw microbatch iterator to yield processed microbatches.

This function takes a raw iterator that yields BatchedDataDict objects and wraps it to yield ProcessedMicrobatch objects that contain both the original data and the processed tensors ready for model forward pass.

Parameters:
  • raw_iterator – Iterator yielding raw BatchedDataDict microbatches

  • tokenizer – Tokenizer for processing

  • cfg – Configuration dictionary (enable_seq_packing is inferred from cfg[“sequence_packing”][“enabled”])

  • cp_size – Context parallel size

Yields:

ProcessedMicrobatch objects containing processed tensors ready for model forward

nemo_rl.models.automodel.data.get_microbatch_iterator(
data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
cfg: dict[str, Any],
mbs: int,
dp_mesh: Any,
tokenizer: transformers.AutoTokenizer,
cp_size: int = 1,
) tuple[Iterator[nemo_rl.models.automodel.data.ProcessedMicrobatch], int]#

Create processed microbatch iterator based on batching strategy.

Parameters:
  • data – Full dataset to iterate over

  • cfg – Configuration dictionary (enable_seq_packing is inferred from cfg[“sequence_packing”][“enabled”])

  • mbs – Microbatch size

  • dp_mesh – Data parallel mesh

  • tokenizer – Tokenizer for processing

  • cp_size – Context parallel size

Returns:

Tuple of (processed_microbatch_iterator, iterator_length)

nemo_rl.models.automodel.data.process_microbatch(
mb: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
tokenizer: transformers.AutoTokenizer,
enable_seq_packing: bool,
cfg: dict[str, Any],
cp_size: int,
) nemo_rl.models.automodel.data.ProcessedInputs#

Process a microbatch and prepare inputs for model forward.

Parameters:
  • mb – Microbatch data

  • tokenizer – Tokenizer for padding value

  • enable_seq_packing – Whether sequence packing is enabled

  • cfg – Configuration dictionary

  • cp_size – Context parallel size

Returns:

ProcessedInputs containing all tensors and metadata for forward pass

nemo_rl.models.automodel.data.process_global_batch(
data: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
loss_fn: nemo_rl.algorithms.interfaces.LossFunction,
dp_group: torch.distributed.ProcessGroup,
*,
batch_idx: int,
batch_size: int,
) dict[str, Any]#

Process a global batch and compute normalization factors.

Parameters:
  • data – Full dataset

  • loss_fn – Loss function (used to check loss type)

  • dp_group – Data parallel process group (for consistency with Megatron naming)

  • batch_idx – Index of batch to extract

  • batch_size – Size of batch to extract

Returns:

  • batch: The extracted batch

  • global_valid_seqs: Number of valid sequences across all ranks

  • global_valid_toks: Number of valid tokens across all ranks

Return type:

Dictionary containing