Carregamento de Dados e Pipelines de Entrada
Visão geral
Um carregamento de dados (data loading) e pipeline de entrada (input pipeline) é tudo o que acontece entre o seu conjunto de dados (dataset) no armazenamento e os tensores (tensors) que chegam ao acelerador (accelerator) (GPU/TPU) para a passagem direta (forward pass). No aprendizado profundo (deep learning) moderno, esse pipeline costuma ser um fator de desempenho de primeira ordem: se ele não conseguir fornecer lotes (batches) rápido o suficiente, seus aceleradores caros ficam ociosos.
Um pipeline eficaz equilibra quatro objetivos:
- Correção: amostras e rótulos são pareados corretamente, as aumentações (augmentations) são válidas, e trabalhadores distribuídos (distributed workers) veem os dados pretendidos.
- Qualidade estatística: bom embaralhamento (shuffling) e comportamento de amostragem (sampling) ao longo das épocas (epochs) e entre trabalhadores distribuídos.
- Vazão (throughput) e latência (latency): os lotes chegam rápido o suficiente para manter a computação saturada.
- Reprodutibilidade (reproducibility) e robustez (robustness): o pipeline se comporta de forma previsível quando desejado e consegue se recuperar de falhas (veja Salvamento de Checkpoints e Tolerância a Falhas (Checkpointing & Fault Tolerance) e Reprodutibilidade).
Este artigo foca em considerações de embaralhamento, pré-processamento (preprocessing) e vazão, especialmente em cenários de treinamento em escala (training-at-scale).
Por que pipelines de entrada viram gargalos
O tempo de iteração de treinamento pode ser entendido como:
- Tempo de computação: passagem direta + passagem inversa (backward) + passo do otimizador (optimizer step).
- Tempo de entrada (input time): ler → decodificar (decode) → pré-processar/aumentar → criar lotes → transferir para o dispositivo.
Se isso acontece em série, seu tempo por passo é aproximadamente:
[ T_{\text{step}} \approx T_{\text{input}} + T_{\text{compute}} ]
Pipelines bem projetados fazem a sobreposição (produtor–consumidor), tornando o tempo por passo mais próximo de:
[ T_{\text{step}} \approx \max(T_{\text{input}}, T_{\text{compute}}) ]
Em escala, o tempo de entrada tende a aumentar porque:
- Você lê mais dados por segundo (tamanhos de lote global (global batch sizes) maiores).
- O armazenamento é remoto (armazenamentos de objetos (object stores), sistemas de arquivos em rede (network file systems)).
- O pré-processamento na CPU (decodificação, tokenização (tokenization), aumentação) vira um gargalo.
- A fragmentação (sharding)/embaralhamento distribuídos adicionam restrições de coordenação.
O objetivo de engenharia é fazer T_input ser confiavelmente menor que T_compute, e estável (baixa oscilação (jitter)), para que GPUs/TPUs permaneçam ocupadas.
Um modelo mental: filas produtor–consumidor e “para onde o tempo vai”
A maioria dos carregadores de alto desempenho implementa um pipeline como:
- Leitura (filesystem/object store)
- Decodificação (JPEG/PNG, registros comprimidos, parsing de protobuf etc.)
- Transformação (tokenizar, normalizar, aumentar)
- Criação de lotes (pad/pack, collate)
- Transferência (cópia host→device, possivelmente async)
- Treino (computação)
Ideia-chave: adicionar paralelismo (parallelism) (múltiplos trabalhadores/threads), bufferização (buffering) (filas) e pré-busca (prefetching) (preparar o próximo lote enquanto treina no lote atual). Mas a bufferização deve ser projetada com cuidado para preservar propriedades estatísticas (especialmente o embaralhamento).
Representação de dados: formatos importam (muito)
O formato de arquivo determina quão eficientemente você consegue ler e embaralhar. Padrões comuns:
Muitos arquivos pequenos (ex.: uma imagem por arquivo)
Prós
- Fácil de inspecionar e manipular.
- Natural para conjuntos de dados distribuídos como pastas de imagens.
Contras
- Alto overhead de metadados (stat calls, varredura de diretórios).
- Baixa vazão em sistemas de arquivos em rede/armazenamentos de objetos.
- Mais difícil fazer embaralhamento global eficiente.
Arquivos de “registros” em fragmentos (fragmentos (shards)) (recomendado em escala)
Exemplos: TFRecord, WebDataset com fragmentos .tar, LMDB, Parquet/Arrow.
Prós
- Leituras sequenciais (sequential reads) e pré-busca eficientes.
- Melhor amortização do overhead de E/S (I/O).
- O embaralhamento pode ser feito no nível de fragmento e também dentro do fragmento.
Contras
- Requer uma etapa de ingestão para criar fragmentos.
- O acesso aleatório (random access) pode ser mais difícil a menos que haja indexação.
Regra prática: em escala, prefira fragmentos com tamanho de dezenas a centenas de MB (ou mais) para que leituras sequenciais dominem o overhead, mas não tão grandes a ponto de o desequilíbrio por fragmento virar um problema.
Trade-offs de compressão
- A compressão reduz o volume de E/S, mas aumenta o custo de decodificação na CPU.
- Se você está limitado por CPU, compressão pesada pode prejudicar a vazão.
- Se você está limitado por E/S (armazenamento remoto), a compressão pode ajudar.
Uma abordagem prática é medir de ponta a ponta: largura de banda do armazenamento, decodificação na CPU e utilização da GPU em conjunto.
Embaralhamento: qualidade estatística vs desempenho
O embaralhamento afeta convergência e generalização ao reduzir correlações entre amostras consecutivas. Mas um embaralhamento “perfeito” pode ser caro.
Níveis de embaralhamento
- Embaralhamento no nível de fragmento: embaralhar a ordem dos fragmentos a cada época.
- Embaralhamento dentro do fragmento: embaralhar exemplos dentro de um fragmento (exige bufferização ou indexação).
- Embaralhamento por buffer (buffer shuffle) (aproximação de embaralhamento global): fazer streaming dos dados e embaralhar dentro de um buffer finito.
Em pipelines de streaming, o embaralhamento por buffer é comum: manter um buffer de tamanho B, amostrar aleatoriamente um elemento para saída e substituí-lo pelo próximo elemento de entrada. Um B maior melhora a aleatoriedade, mas aumenta memória e latência de inicialização.
Trade-off:
- Buffer pequeno → mais rápido, mas mais correlações locais.
- Buffer grande → melhor qualidade de embaralhamento, mas mais RAM e latência.
Embaralhamento em treinamento distribuído
Com treinamento em paralelismo de dados (data parallel) (veja Treinamento Distribuído (Distributed Training) e Paralelismo de Dados/Modelo/Pipeline (Data/Model/Pipeline Parallelism)), cada trabalhador (rank (rank)) precisa receber um subconjunto diferente de dados a cada passo.
Duas estratégias comuns:
- Embaralhar globalmente e então fragmentar: conceitualmente ideal, mas caro/impraticável para streaming.
- Fragmentar e então embaralhar localmente: cada trabalhador lê um subconjunto distinto de fragmentos e embaralha localmente.
A segunda é típica em escala. Para preservar a qualidade do treinamento:
- Embaralhe a ordem dos fragmentos com uma semente diferente a cada época.
- Garanta que os fragmentos sejam suficientemente numerosos e misturados para reduzir deriva de distribuição por rank.
Garantindo “cada amostra vista uma vez por época”
Se você precisa de semântica estrita de época (comum em aprendizado supervisionado), deve coordenar a fragmentação para que a união dos fluxos dos trabalhadores cubra o conjunto de dados sem sobreposições.
Exemplos:
- PyTorch:
DistributedSampler(dataset, shuffle=True)lida com reembaralhamento e particionamento por época. - TF:
dataset.shard(num_workers, worker_id)combinado comshuffle(...)e sementes bem definidas.
Note que alguns formatos de streaming (ex.: fluxos infinitos de WebDataset) podem relaxar a semântica estrita de “época = passagem completa” em favor de vazão.
Amostragem ponderada e desbalanceamento de classes
Embaralhamento não é o mesmo que amostragem. Para conjuntos de dados desbalanceados, você pode usar:
- Amostragem aleatória ponderada (weighted random sampling) (pode distorcer a semântica de época).
- Oversampling de classes minoritárias.
- Amostragem balanceada por lote (cada lote tem uma cota por classe).
Em escala, implemente isso com cuidado — abordagens ingênuas podem introduzir overhead de sincronização ou reduzir a qualidade do embaralhamento (ex.: reamostrar repetidamente um subconjunto minoritário pequeno).
Pré-processamento e aumentação: offline vs online
Pré-processamento inclui decodificação, normalização, extração de características, tokenização, recorte, redimensionamento e aumentação.
Pré-processamento offline (pré-computar uma vez)
Exemplos:
- Pré-tokenizar texto em IDs inteiros.
- Redimensionar imagens para um tamanho padrão.
- Pré-computar espectrogramas para áudio.
Prós
- Vazão de treinamento muito maior (menos trabalho na CPU).
- Mais determinístico e mais fácil de depurar.
Contras
- Menos flexibilidade (mudar aumentações exige regenerar os dados).
- Maior pegada de armazenamento (especialmente se armazenar dados decodificados).
Pré-processamento online (computar em tempo real)
Exemplos:
- Recortes/flip/jitter de cor aleatórios.
- Mascaramento dinâmico para modelagem de linguagem.
- MixUp/CutMix.
Prós
- Mais diversidade de dados, melhora a generalização.
- Não é necessário armazenar múltiplas versões aumentadas.
Contras
- Gargalos na CPU são comuns.
- Pode introduzir não determinismo e oscilação.
Abordagem híbrida: realize etapas caras e determinísticas offline (decodificar/redimensionar/tokenizar) e mantenha aleatoriedade barata online (pequenas aumentações, mascaramento).
Pré-processamento na CPU vs na GPU
Se a CPU virar o gargalo, considere:
- Mover transformações para a GPU (quando disponível e correto).
- Usar bibliotecas dedicadas (ex.: NVIDIA DALI para pipelines de imagem/áudio).
- Vetorizar operações (transformações em lote em vez de loops em Python por exemplo).
Um insight prático importante: trabalho por amostra no nível do Python (loops, transformações do PIL) frequentemente vira o gargalo muito antes da E/S bruta. Prefira kernels vetorizados (vectorized kernels) e transformações compiladas (compiled transforms).
Engenharia de vazão: sobreposição, paralelismo e memória
Carregamento de dados em paralelo (parallel data loading)
A maioria dos frameworks permite múltiplos processos/threads de trabalhadores (workers).
- Mais trabalhadores podem melhorar a vazão até certo ponto.
- Trabalhadores demais podem causar contenção (buscas em disco, trashing de cache de CPU, trocas de contexto).
- Observe retornos decrescentes e aumento de variabilidade.
Ajustes comuns
- PyTorch
DataLoader(num_workers=..., prefetch_factor=..., persistent_workers=True) - TF
dataset.map(..., num_parallel_calls=tf.data.AUTOTUNE).prefetch(AUTOTUNE)
Pré-busca e bufferização
A pré-busca cria uma fila de lotes prontos para treino. Isso:
- Sobrepõe entrada com computação.
- Suaviza travamentos transitórios de E/S.
- Aumenta o uso de memória.
Para utilização estável da GPU, em geral você quer pelo menos 1–2 lotes em pré-busca por dispositivo, mas o valor certo depende do tempo por passo e da oscilação.
Transferência host→dispositivo e memória fixada
Para treino em GPU, cópias do host para o dispositivo podem virar overhead visível.
No PyTorch:
pin_memory=Truehabilita memória fixada (pinned memory) (memória do host com páginas bloqueadas) para acelerar transferências por DMA.- Use
non_blocking=Trueao mover tensores para a GPU para sobrepor cópias com computação (quando possível).
Exemplo de padrão:
for batch in loader:
inputs, labels = batch
inputs = inputs.to(device, non_blocking=True)
labels = labels.to(device, non_blocking=True)
loss = model(inputs, labels)
...
Isso funciona melhor quando o loader produz tensores em memória fixada e o stream da GPU tem espaço para sobrepor cópias.
Colação: padding, bucketing e packing
Para entradas de comprimento variável (texto, áudio), padding ingênuo desperdiça computação e largura de banda.
Técnicas:
- Agrupamento por faixas (bucketing): agrupar exemplos de comprimentos similares no mesmo lote para reduzir padding.
- Loteamento dinâmico (dynamic batching): criar lotes por tokens (tokens) ou frames (frames) em vez de por exemplos.
- Empacotamento (packing): concatenar múltiplas sequências em uma sequência mais longa (comum no pré-treinamento da Arquitetura Transformer (Transformer Architecture)) para reduzir padding e melhorar a utilização.
Empacotamento pode melhorar a vazão drasticamente, mas altera os limites efetivos das amostras e pode exigir máscaras de atenção e mascaramento de perda.
Cacheamento
Cacheamento pode ser:
- Em memória (mais rápido, limitado por RAM).
- Em SSD local (comum em clusters).
- Cache do framework (ex.:
tf.data.Dataset.cache()).
Cacheamento é especialmente eficaz quando:
- Seu conjunto de dados cabe no SSD local mas não na RAM.
- Você faz etapas caras de decodificação/parse que quer evitar repetir.
Cuidado: cachear depois de aumentação aleatória vai “congelar” a aleatoriedade, a menos que o cache seja colocado antes no pipeline.
Exemplos práticos
PyTorch: DataLoader eficiente para classificação de imagens
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
train_tfms = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=(0.485, 0.456, 0.406),
std=(0.229, 0.224, 0.225)),
])
ds = datasets.ImageFolder("/data/imagenet/train", transform=train_tfms)
loader = DataLoader(
ds,
batch_size=256,
shuffle=True, # local shuffle; for DDP use DistributedSampler
num_workers=8, # tune per machine
pin_memory=True,
persistent_workers=True, # avoid worker restart overhead each epoch
prefetch_factor=4, # batches prefetched per worker
drop_last=True,
)
device = "cuda"
for images, labels in loader:
images = images.to(device, non_blocking=True)
labels = labels.to(device, non_blocking=True)
# training step...
Notas
- Para treinamento distribuído, substitua
shuffle=Truepor umDistributedSamplere chamesampler.set_epoch(epoch)a cada época para reembaralhar de forma determinística entre ranks. - Se a CPU estiver saturada, considere menos trabalhadores mais “pesados”, bibliotecas de decodificação mais rápidas, ou migrar para formatos de registros fragmentados.
TensorFlow: pipeline `tf.data` com map paralelo e prefetch
import tensorflow as tf
files = tf.data.Dataset.list_files("/data/shards/train-*.tfrecord", shuffle=True)
def parse_example(record_bytes):
features = {
"image": tf.io.FixedLenFeature([], tf.string),
"label": tf.io.FixedLenFeature([], tf.int64),
}
ex = tf.io.parse_single_example(record_bytes, features)
img = tf.io.decode_jpeg(ex["image"], channels=3)
img = tf.image.resize(img, [224, 224])
img = tf.cast(img, tf.float32) / 255.0
img = tf.image.random_flip_left_right(img)
return img, ex["label"]
ds = (tf.data.TFRecordDataset(files, num_parallel_reads=tf.data.AUTOTUNE)
.shuffle(50_000) # buffer shuffle; tune for memory/statistics
.map(parse_example, num_parallel_calls=tf.data.AUTOTUNE)
.batch(256, drop_remainder=True)
.prefetch(tf.data.AUTOTUNE))
Notas
num_parallel_readshabilita interleaving entre fragmentos, melhorando vazão e aleatoriedade.- Posicione
.shuffle(...)com intenção: embaralhar depois do parsing pode embaralhar exemplos individuais; embaralhar antes pode embaralhar a ordem dos arquivos, mas não misturar dentro dos arquivos.
Pipelines de entrada distribuídos: fragmentação, sementes e semântica de “exatamente uma vez”
Estratégias de fragmentação
Em treinamento com múltiplos trabalhadores, cada trabalhador deve ler um subconjunto distinto para evitar duplicatas e computação desperdiçada.
Abordagens comuns:
- Fragmentação baseada em arquivos (file-based sharding): atribuir fragmentos aos trabalhadores de forma determinística (ex.: o trabalhador
ilê cadaN-ésimo fragmento). Simples e escalável. - Fragmentação baseada em índices (index-based sharding): para conjuntos com acesso aleatório, particionar índices entre trabalhadores.
- Ingestão baseada em serviço (service-based ingestion): um serviço de dados gerencia distribuição e balanceamento de carga (mais complexo, pode melhorar a utilização).
Definição de sementes e determinismo
Embaralhamento e aumentações aleatórias usam geradores de números aleatórios em múltiplas camadas:
- Python, NumPy
- RNG do framework
- RNG por trabalhador
- Kernels de GPU (às vezes não determinísticos)
Para reprodutibilidade entre execuções, você precisa de sementes consistentes e particionamento de dados consistente. Veja Reprodutibilidade para o panorama mais amplo.
Um padrão prático é:
- Usar uma semente global.
- Derivar semente por época:
seed_epoch = hash(global_seed, epoch). - Derivar semente por trabalhador:
seed_worker = hash(seed_epoch, worker_id).
Observe que determinismo estrito pode reduzir desempenho (ex.: forçar kernels determinísticos na GPU).
Tolerância a falhas e retomada
Em escala, jobs falham. Um pipeline robusto deve suportar retomada sem:
- repetir dados demais (a menos que isso seja aceitável),
- pular segmentos grandes de forma inesperada.
Isso é difícil em pipelines de streaming sem índices de amostras estáveis. Abordagens incluem:
- armazenar estado do cursor de dados (data cursor state) (id do fragmento + offset),
- usar fragmentação determinística baseada em época/passo (epoch/step-based deterministic sharding) para recomputar quais amostras pertencem a qual passo,
- confiar em semântica aproximada (comum em pré-treinamento em grande escala).
Veja Salvamento de Checkpoints e Tolerância a Falhas para considerações do estado de treinamento.
Medindo e depurando o desempenho do pipeline
O que medir
- Utilização da GPU/TPU: utilização baixa frequentemente indica gargalo de entrada.
- Decomposição do tempo por passo: tempo esperando dados vs computação.
- Utilização de CPU por trabalhador: os trabalhadores estão no limite?
- Vazão de E/S: taxas de leitura de disco/rede, taxas de acerto de cache.
- Profundidade das filas: as filas de pré-busca estão esvaziando?
Sintomas comuns e correções
Sintoma: GPUs ficam ociosas periodicamente (utilização em dente de serra)
- Aumente o tamanho do buffer de pré-busca.
- Reduza a variância de pré-processamento por lote.
- Mova dados para armazenamento local mais rápido ou habilite cacheamento.
Sintoma: CPU no limite, GPU subutilizada
- Reduza overhead de Python: evite código Python por amostra.
- Use transformações vetorizadas ou operadores compilados.
- Pré-compute etapas caras offline.
- Considere pré-processamento acelerado por GPU.
Sintoma: sistema de arquivos em rede/armazenamento de objetos com throttling
- Use leituras sequenciais maiores (dados fragmentados).
- Adicione cacheamento em SSD local.
- Aumente paralelismo de leitura com cuidado (evite “efeito manada (thundering herd)”).
Sintoma: convergência piora após otimizar o pipeline
- Verifique a qualidade do embaralhamento (buffer pequeno demais, desequilíbrio por rank).
- Garanta que aumentações não mudaram a semântica.
- Verifique se não há duplicatas/omissões entre trabalhadores.
Microbenchmark do carregador
Um teste simples, mas eficaz, é rodar o carregador sem computação do modelo e medir lotes/seg, depois comparar com a vazão requerida:
[ \text{required batches/sec} \approx \frac{1}{T_{\text{compute per step}}} ]
Se o carregador sozinho não consegue exceder essa taxa (idealmente por uma margem confortável), o treinamento ficará limitado pela entrada.
Boas práticas (checklist)
Embaralhamento e amostragem
- Prefira conjuntos de dados fragmentados e embaralhe tanto no nível de fragmento quanto no nível de exemplo.
- Use um buffer de embaralhamento grande o suficiente para quebrar correlações locais.
- Em treinamento distribuído, garanta fragmentação sem sobreposição e reembaralhamento por época.
Pré-processamento
- Leve o pré-processamento determinístico caro para offline (tokenização, redimensionamento, parsing).
- Mantenha aumentações online baratas e paralelizáveis.
- Evite loops em Python no caminho crítico.
Vazão
- Ajuste
num_workers/ chamadas paralelas com base nos núcleos de CPU e no comportamento do armazenamento. - Use pré-busca e trabalhadores persistentes para reduzir overhead e oscilação.
- Use memória fixada e transferências assíncronas para o dispositivo quando aplicável.
Escala e robustez
- Projete para a realidade de armazenamento remoto: leituras maiores, menos arquivos, cacheamento.
- Faça com que aleatoriedade e fragmentação sejam controláveis por sementes para depuração e reprodutibilidade.
- Considere semântica de retomada cedo se os treinos forem longos.
Como isso se encaixa em treinamento em escala
Pipelines de entrada interagem com quase todas as outras decisões de escalonamento:
- Tamanhos de lote global maiores (frequentemente viabilizados por Precisão Mista (Mixed Precision)) elevam os requisitos de vazão de dados.
- Estratégias distribuídas (veja Treinamento Distribuído) restringem como você fragmenta e embaralha.
- Escolhas de paralelismo (veja Paralelismo de Dados/Modelo/Pipeline) afetam a composição do lote por dispositivo e, portanto, a eficiência de colação/padding.
- Reproduzir resultados em escala exige controlar sementes e ordenação de dados (veja Reprodutibilidade).
Um pipeline de dados bem projetado não é apenas “E/S mais rápida” — é um sistema cuidadosamente construído que preserva propriedades estatísticas enquanto mantém aceleradores alimentados de forma consistente.