Carregamento de Dados e Pipelines de Entrada

Visão geral

Um carregamento de dados (data loading) e pipeline de entrada (input pipeline) é tudo o que acontece entre o seu conjunto de dados (dataset) no armazenamento e os tensores (tensors) que chegam ao acelerador (accelerator) (GPU/TPU) para a passagem direta (forward pass). No aprendizado profundo (deep learning) moderno, esse pipeline costuma ser um fator de desempenho de primeira ordem: se ele não conseguir fornecer lotes (batches) rápido o suficiente, seus aceleradores caros ficam ociosos.

Um pipeline eficaz equilibra quatro objetivos:

  1. Correção: amostras e rótulos são pareados corretamente, as aumentações (augmentations) são válidas, e trabalhadores distribuídos (distributed workers) veem os dados pretendidos.
  2. Qualidade estatística: bom embaralhamento (shuffling) e comportamento de amostragem (sampling) ao longo das épocas (epochs) e entre trabalhadores distribuídos.
  3. Vazão (throughput) e latência (latency): os lotes chegam rápido o suficiente para manter a computação saturada.
  4. Reprodutibilidade (reproducibility) e robustez (robustness): o pipeline se comporta de forma previsível quando desejado e consegue se recuperar de falhas (veja Salvamento de Checkpoints e Tolerância a Falhas (Checkpointing & Fault Tolerance) e Reprodutibilidade).

Este artigo foca em considerações de embaralhamento, pré-processamento (preprocessing) e vazão, especialmente em cenários de treinamento em escala (training-at-scale).

Por que pipelines de entrada viram gargalos

O tempo de iteração de treinamento pode ser entendido como:

  • Tempo de computação: passagem direta + passagem inversa (backward) + passo do otimizador (optimizer step).
  • Tempo de entrada (input time): ler → decodificar (decode) → pré-processar/aumentar → criar lotes → transferir para o dispositivo.

Se isso acontece em série, seu tempo por passo é aproximadamente:

[ T_{\text{step}} \approx T_{\text{input}} + T_{\text{compute}} ]

Pipelines bem projetados fazem a sobreposição (produtor–consumidor), tornando o tempo por passo mais próximo de:

[ T_{\text{step}} \approx \max(T_{\text{input}}, T_{\text{compute}}) ]

Em escala, o tempo de entrada tende a aumentar porque:

  • Você lê mais dados por segundo (tamanhos de lote global (global batch sizes) maiores).
  • O armazenamento é remoto (armazenamentos de objetos (object stores), sistemas de arquivos em rede (network file systems)).
  • O pré-processamento na CPU (decodificação, tokenização (tokenization), aumentação) vira um gargalo.
  • A fragmentação (sharding)/embaralhamento distribuídos adicionam restrições de coordenação.

O objetivo de engenharia é fazer T_input ser confiavelmente menor que T_compute, e estável (baixa oscilação (jitter)), para que GPUs/TPUs permaneçam ocupadas.

Um modelo mental: filas produtor–consumidor e “para onde o tempo vai”

A maioria dos carregadores de alto desempenho implementa um pipeline como:

  1. Leitura (filesystem/object store)
  2. Decodificação (JPEG/PNG, registros comprimidos, parsing de protobuf etc.)
  3. Transformação (tokenizar, normalizar, aumentar)
  4. Criação de lotes (pad/pack, collate)
  5. Transferência (cópia host→device, possivelmente async)
  6. Treino (computação)

Ideia-chave: adicionar paralelismo (parallelism) (múltiplos trabalhadores/threads), bufferização (buffering) (filas) e pré-busca (prefetching) (preparar o próximo lote enquanto treina no lote atual). Mas a bufferização deve ser projetada com cuidado para preservar propriedades estatísticas (especialmente o embaralhamento).

Representação de dados: formatos importam (muito)

O formato de arquivo determina quão eficientemente você consegue ler e embaralhar. Padrões comuns:

Muitos arquivos pequenos (ex.: uma imagem por arquivo)

Prós

  • Fácil de inspecionar e manipular.
  • Natural para conjuntos de dados distribuídos como pastas de imagens.

Contras

  • Alto overhead de metadados (stat calls, varredura de diretórios).
  • Baixa vazão em sistemas de arquivos em rede/armazenamentos de objetos.
  • Mais difícil fazer embaralhamento global eficiente.

Arquivos de “registros” em fragmentos (fragmentos (shards)) (recomendado em escala)

Exemplos: TFRecord, WebDataset com fragmentos .tar, LMDB, Parquet/Arrow.

Prós

  • Leituras sequenciais (sequential reads) e pré-busca eficientes.
  • Melhor amortização do overhead de E/S (I/O).
  • O embaralhamento pode ser feito no nível de fragmento e também dentro do fragmento.

Contras

  • Requer uma etapa de ingestão para criar fragmentos.
  • O acesso aleatório (random access) pode ser mais difícil a menos que haja indexação.

Regra prática: em escala, prefira fragmentos com tamanho de dezenas a centenas de MB (ou mais) para que leituras sequenciais dominem o overhead, mas não tão grandes a ponto de o desequilíbrio por fragmento virar um problema.

Trade-offs de compressão

  • A compressão reduz o volume de E/S, mas aumenta o custo de decodificação na CPU.
  • Se você está limitado por CPU, compressão pesada pode prejudicar a vazão.
  • Se você está limitado por E/S (armazenamento remoto), a compressão pode ajudar.

Uma abordagem prática é medir de ponta a ponta: largura de banda do armazenamento, decodificação na CPU e utilização da GPU em conjunto.

Embaralhamento: qualidade estatística vs desempenho

O embaralhamento afeta convergência e generalização ao reduzir correlações entre amostras consecutivas. Mas um embaralhamento “perfeito” pode ser caro.

Níveis de embaralhamento

  1. Embaralhamento no nível de fragmento: embaralhar a ordem dos fragmentos a cada época.
  2. Embaralhamento dentro do fragmento: embaralhar exemplos dentro de um fragmento (exige bufferização ou indexação).
  3. Embaralhamento por buffer (buffer shuffle) (aproximação de embaralhamento global): fazer streaming dos dados e embaralhar dentro de um buffer finito.

Em pipelines de streaming, o embaralhamento por buffer é comum: manter um buffer de tamanho B, amostrar aleatoriamente um elemento para saída e substituí-lo pelo próximo elemento de entrada. Um B maior melhora a aleatoriedade, mas aumenta memória e latência de inicialização.

Trade-off:

  • Buffer pequeno → mais rápido, mas mais correlações locais.
  • Buffer grande → melhor qualidade de embaralhamento, mas mais RAM e latência.

Embaralhamento em treinamento distribuído

Com treinamento em paralelismo de dados (data parallel) (veja Treinamento Distribuído (Distributed Training) e Paralelismo de Dados/Modelo/Pipeline (Data/Model/Pipeline Parallelism)), cada trabalhador (rank (rank)) precisa receber um subconjunto diferente de dados a cada passo.

Duas estratégias comuns:

  • Embaralhar globalmente e então fragmentar: conceitualmente ideal, mas caro/impraticável para streaming.
  • Fragmentar e então embaralhar localmente: cada trabalhador lê um subconjunto distinto de fragmentos e embaralha localmente.

A segunda é típica em escala. Para preservar a qualidade do treinamento:

  • Embaralhe a ordem dos fragmentos com uma semente diferente a cada época.
  • Garanta que os fragmentos sejam suficientemente numerosos e misturados para reduzir deriva de distribuição por rank.

Garantindo “cada amostra vista uma vez por época”

Se você precisa de semântica estrita de época (comum em aprendizado supervisionado), deve coordenar a fragmentação para que a união dos fluxos dos trabalhadores cubra o conjunto de dados sem sobreposições.

Exemplos:

  • PyTorch: DistributedSampler(dataset, shuffle=True) lida com reembaralhamento e particionamento por época.
  • TF: dataset.shard(num_workers, worker_id) combinado com shuffle(...) e sementes bem definidas.

Note que alguns formatos de streaming (ex.: fluxos infinitos de WebDataset) podem relaxar a semântica estrita de “época = passagem completa” em favor de vazão.

Amostragem ponderada e desbalanceamento de classes

Embaralhamento não é o mesmo que amostragem. Para conjuntos de dados desbalanceados, você pode usar:

  • Amostragem aleatória ponderada (weighted random sampling) (pode distorcer a semântica de época).
  • Oversampling de classes minoritárias.
  • Amostragem balanceada por lote (cada lote tem uma cota por classe).

Em escala, implemente isso com cuidado — abordagens ingênuas podem introduzir overhead de sincronização ou reduzir a qualidade do embaralhamento (ex.: reamostrar repetidamente um subconjunto minoritário pequeno).

Pré-processamento e aumentação: offline vs online

Pré-processamento inclui decodificação, normalização, extração de características, tokenização, recorte, redimensionamento e aumentação.

Pré-processamento offline (pré-computar uma vez)

Exemplos:

  • Pré-tokenizar texto em IDs inteiros.
  • Redimensionar imagens para um tamanho padrão.
  • Pré-computar espectrogramas para áudio.

Prós

  • Vazão de treinamento muito maior (menos trabalho na CPU).
  • Mais determinístico e mais fácil de depurar.

Contras

  • Menos flexibilidade (mudar aumentações exige regenerar os dados).
  • Maior pegada de armazenamento (especialmente se armazenar dados decodificados).

Pré-processamento online (computar em tempo real)

Exemplos:

  • Recortes/flip/jitter de cor aleatórios.
  • Mascaramento dinâmico para modelagem de linguagem.
  • MixUp/CutMix.

Prós

  • Mais diversidade de dados, melhora a generalização.
  • Não é necessário armazenar múltiplas versões aumentadas.

Contras

  • Gargalos na CPU são comuns.
  • Pode introduzir não determinismo e oscilação.

Abordagem híbrida: realize etapas caras e determinísticas offline (decodificar/redimensionar/tokenizar) e mantenha aleatoriedade barata online (pequenas aumentações, mascaramento).

Pré-processamento na CPU vs na GPU

Se a CPU virar o gargalo, considere:

  • Mover transformações para a GPU (quando disponível e correto).
  • Usar bibliotecas dedicadas (ex.: NVIDIA DALI para pipelines de imagem/áudio).
  • Vetorizar operações (transformações em lote em vez de loops em Python por exemplo).

Um insight prático importante: trabalho por amostra no nível do Python (loops, transformações do PIL) frequentemente vira o gargalo muito antes da E/S bruta. Prefira kernels vetorizados (vectorized kernels) e transformações compiladas (compiled transforms).

Engenharia de vazão: sobreposição, paralelismo e memória

Carregamento de dados em paralelo (parallel data loading)

A maioria dos frameworks permite múltiplos processos/threads de trabalhadores (workers).

  • Mais trabalhadores podem melhorar a vazão até certo ponto.
  • Trabalhadores demais podem causar contenção (buscas em disco, trashing de cache de CPU, trocas de contexto).
  • Observe retornos decrescentes e aumento de variabilidade.

Ajustes comuns

  • PyTorch DataLoader(num_workers=..., prefetch_factor=..., persistent_workers=True)
  • TF dataset.map(..., num_parallel_calls=tf.data.AUTOTUNE).prefetch(AUTOTUNE)

Pré-busca e bufferização

A pré-busca cria uma fila de lotes prontos para treino. Isso:

  • Sobrepõe entrada com computação.
  • Suaviza travamentos transitórios de E/S.
  • Aumenta o uso de memória.

Para utilização estável da GPU, em geral você quer pelo menos 1–2 lotes em pré-busca por dispositivo, mas o valor certo depende do tempo por passo e da oscilação.

Transferência host→dispositivo e memória fixada

Para treino em GPU, cópias do host para o dispositivo podem virar overhead visível.

No PyTorch:

  • pin_memory=True habilita memória fixada (pinned memory) (memória do host com páginas bloqueadas) para acelerar transferências por DMA.
  • Use non_blocking=True ao mover tensores para a GPU para sobrepor cópias com computação (quando possível).

Exemplo de padrão:

for batch in loader:
    inputs, labels = batch
    inputs = inputs.to(device, non_blocking=True)
    labels = labels.to(device, non_blocking=True)
    loss = model(inputs, labels)
    ...

Isso funciona melhor quando o loader produz tensores em memória fixada e o stream da GPU tem espaço para sobrepor cópias.

Colação: padding, bucketing e packing

Para entradas de comprimento variável (texto, áudio), padding ingênuo desperdiça computação e largura de banda.

Técnicas:

  • Agrupamento por faixas (bucketing): agrupar exemplos de comprimentos similares no mesmo lote para reduzir padding.
  • Loteamento dinâmico (dynamic batching): criar lotes por tokens (tokens) ou frames (frames) em vez de por exemplos.
  • Empacotamento (packing): concatenar múltiplas sequências em uma sequência mais longa (comum no pré-treinamento da Arquitetura Transformer (Transformer Architecture)) para reduzir padding e melhorar a utilização.

Empacotamento pode melhorar a vazão drasticamente, mas altera os limites efetivos das amostras e pode exigir máscaras de atenção e mascaramento de perda.

Cacheamento

Cacheamento pode ser:

  • Em memória (mais rápido, limitado por RAM).
  • Em SSD local (comum em clusters).
  • Cache do framework (ex.: tf.data.Dataset.cache()).

Cacheamento é especialmente eficaz quando:

  • Seu conjunto de dados cabe no SSD local mas não na RAM.
  • Você faz etapas caras de decodificação/parse que quer evitar repetir.

Cuidado: cachear depois de aumentação aleatória vai “congelar” a aleatoriedade, a menos que o cache seja colocado antes no pipeline.

Exemplos práticos

PyTorch: DataLoader eficiente para classificação de imagens

import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

train_tfms = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.485, 0.456, 0.406),
                         std=(0.229, 0.224, 0.225)),
])

ds = datasets.ImageFolder("/data/imagenet/train", transform=train_tfms)

loader = DataLoader(
    ds,
    batch_size=256,
    shuffle=True,                 # local shuffle; for DDP use DistributedSampler
    num_workers=8,                # tune per machine
    pin_memory=True,
    persistent_workers=True,      # avoid worker restart overhead each epoch
    prefetch_factor=4,            # batches prefetched per worker
    drop_last=True,
)

device = "cuda"
for images, labels in loader:
    images = images.to(device, non_blocking=True)
    labels = labels.to(device, non_blocking=True)
    # training step...

Notas

  • Para treinamento distribuído, substitua shuffle=True por um DistributedSampler e chame sampler.set_epoch(epoch) a cada época para reembaralhar de forma determinística entre ranks.
  • Se a CPU estiver saturada, considere menos trabalhadores mais “pesados”, bibliotecas de decodificação mais rápidas, ou migrar para formatos de registros fragmentados.

TensorFlow: pipeline `tf.data` com map paralelo e prefetch

import tensorflow as tf

files = tf.data.Dataset.list_files("/data/shards/train-*.tfrecord", shuffle=True)

def parse_example(record_bytes):
    features = {
        "image": tf.io.FixedLenFeature([], tf.string),
        "label": tf.io.FixedLenFeature([], tf.int64),
    }
    ex = tf.io.parse_single_example(record_bytes, features)
    img = tf.io.decode_jpeg(ex["image"], channels=3)
    img = tf.image.resize(img, [224, 224])
    img = tf.cast(img, tf.float32) / 255.0
    img = tf.image.random_flip_left_right(img)
    return img, ex["label"]

ds = (tf.data.TFRecordDataset(files, num_parallel_reads=tf.data.AUTOTUNE)
      .shuffle(50_000)  # buffer shuffle; tune for memory/statistics
      .map(parse_example, num_parallel_calls=tf.data.AUTOTUNE)
      .batch(256, drop_remainder=True)
      .prefetch(tf.data.AUTOTUNE))

Notas

  • num_parallel_reads habilita interleaving entre fragmentos, melhorando vazão e aleatoriedade.
  • Posicione .shuffle(...) com intenção: embaralhar depois do parsing pode embaralhar exemplos individuais; embaralhar antes pode embaralhar a ordem dos arquivos, mas não misturar dentro dos arquivos.

Pipelines de entrada distribuídos: fragmentação, sementes e semântica de “exatamente uma vez”

Estratégias de fragmentação

Em treinamento com múltiplos trabalhadores, cada trabalhador deve ler um subconjunto distinto para evitar duplicatas e computação desperdiçada.

Abordagens comuns:

  • Fragmentação baseada em arquivos (file-based sharding): atribuir fragmentos aos trabalhadores de forma determinística (ex.: o trabalhador i lê cada N-ésimo fragmento). Simples e escalável.
  • Fragmentação baseada em índices (index-based sharding): para conjuntos com acesso aleatório, particionar índices entre trabalhadores.
  • Ingestão baseada em serviço (service-based ingestion): um serviço de dados gerencia distribuição e balanceamento de carga (mais complexo, pode melhorar a utilização).

Definição de sementes e determinismo

Embaralhamento e aumentações aleatórias usam geradores de números aleatórios em múltiplas camadas:

  • Python, NumPy
  • RNG do framework
  • RNG por trabalhador
  • Kernels de GPU (às vezes não determinísticos)

Para reprodutibilidade entre execuções, você precisa de sementes consistentes e particionamento de dados consistente. Veja Reprodutibilidade para o panorama mais amplo.

Um padrão prático é:

  • Usar uma semente global.
  • Derivar semente por época: seed_epoch = hash(global_seed, epoch).
  • Derivar semente por trabalhador: seed_worker = hash(seed_epoch, worker_id).

Observe que determinismo estrito pode reduzir desempenho (ex.: forçar kernels determinísticos na GPU).

Tolerância a falhas e retomada

Em escala, jobs falham. Um pipeline robusto deve suportar retomada sem:

  • repetir dados demais (a menos que isso seja aceitável),
  • pular segmentos grandes de forma inesperada.

Isso é difícil em pipelines de streaming sem índices de amostras estáveis. Abordagens incluem:

  • armazenar estado do cursor de dados (data cursor state) (id do fragmento + offset),
  • usar fragmentação determinística baseada em época/passo (epoch/step-based deterministic sharding) para recomputar quais amostras pertencem a qual passo,
  • confiar em semântica aproximada (comum em pré-treinamento em grande escala).

Veja Salvamento de Checkpoints e Tolerância a Falhas para considerações do estado de treinamento.

Medindo e depurando o desempenho do pipeline

O que medir

  • Utilização da GPU/TPU: utilização baixa frequentemente indica gargalo de entrada.
  • Decomposição do tempo por passo: tempo esperando dados vs computação.
  • Utilização de CPU por trabalhador: os trabalhadores estão no limite?
  • Vazão de E/S: taxas de leitura de disco/rede, taxas de acerto de cache.
  • Profundidade das filas: as filas de pré-busca estão esvaziando?

Sintomas comuns e correções

Sintoma: GPUs ficam ociosas periodicamente (utilização em dente de serra)

  • Aumente o tamanho do buffer de pré-busca.
  • Reduza a variância de pré-processamento por lote.
  • Mova dados para armazenamento local mais rápido ou habilite cacheamento.

Sintoma: CPU no limite, GPU subutilizada

  • Reduza overhead de Python: evite código Python por amostra.
  • Use transformações vetorizadas ou operadores compilados.
  • Pré-compute etapas caras offline.
  • Considere pré-processamento acelerado por GPU.

Sintoma: sistema de arquivos em rede/armazenamento de objetos com throttling

  • Use leituras sequenciais maiores (dados fragmentados).
  • Adicione cacheamento em SSD local.
  • Aumente paralelismo de leitura com cuidado (evite “efeito manada (thundering herd)”).

Sintoma: convergência piora após otimizar o pipeline

  • Verifique a qualidade do embaralhamento (buffer pequeno demais, desequilíbrio por rank).
  • Garanta que aumentações não mudaram a semântica.
  • Verifique se não há duplicatas/omissões entre trabalhadores.

Microbenchmark do carregador

Um teste simples, mas eficaz, é rodar o carregador sem computação do modelo e medir lotes/seg, depois comparar com a vazão requerida:

[ \text{required batches/sec} \approx \frac{1}{T_{\text{compute per step}}} ]

Se o carregador sozinho não consegue exceder essa taxa (idealmente por uma margem confortável), o treinamento ficará limitado pela entrada.

Boas práticas (checklist)

Embaralhamento e amostragem

  • Prefira conjuntos de dados fragmentados e embaralhe tanto no nível de fragmento quanto no nível de exemplo.
  • Use um buffer de embaralhamento grande o suficiente para quebrar correlações locais.
  • Em treinamento distribuído, garanta fragmentação sem sobreposição e reembaralhamento por época.

Pré-processamento

  • Leve o pré-processamento determinístico caro para offline (tokenização, redimensionamento, parsing).
  • Mantenha aumentações online baratas e paralelizáveis.
  • Evite loops em Python no caminho crítico.

Vazão

  • Ajuste num_workers / chamadas paralelas com base nos núcleos de CPU e no comportamento do armazenamento.
  • Use pré-busca e trabalhadores persistentes para reduzir overhead e oscilação.
  • Use memória fixada e transferências assíncronas para o dispositivo quando aplicável.

Escala e robustez

  • Projete para a realidade de armazenamento remoto: leituras maiores, menos arquivos, cacheamento.
  • Faça com que aleatoriedade e fragmentação sejam controláveis por sementes para depuração e reprodutibilidade.
  • Considere semântica de retomada cedo se os treinos forem longos.

Como isso se encaixa em treinamento em escala

Pipelines de entrada interagem com quase todas as outras decisões de escalonamento:

Um pipeline de dados bem projetado não é apenas “E/S mais rápida” — é um sistema cuidadosamente construído que preserva propriedades estatísticas enquanto mantém aceleradores alimentados de forma consistente.