How to efficiently convert a large parallel corpus to a Huggingface dataset to train an EncoderDecoderModel?

Typical EncoderDecoderModel that works on a Pre-coded Dataset

The code snippet snippet as below is frequently used to train an EncoderDecoderModel from Huggingface’s transformer library

from transformers import EncoderDecoderModel
from transformers import PreTrainedTokenizerFast

multibert = EncoderDecoderModel.from_encoder_decoder_pretrained(
    "bert-base-multilingual-uncased", "bert-base-multilingual-uncased"
)


tokenizer = PreTrainedTokenizerFast.from_pretrained("bert-base-multilingual-uncased")

...

And a pre-processed/coded dataset can be used to train the model as such, when using the wmt14 dataset:

import datasets

train_data = datasets.load_dataset("wmt14", "de-en", split="train")
val_data = datasets.load_dataset("wmt14", "de-en", split="validation[:10%]")


from functools import partial

def process_data_to_model_inputs(batch, encoder_max_length=512, decoder_max_length=512, batch_size=2): 
    inputs = tokenizer([segment["en"] for segment in batch['translation']], 
                       padding="max_length", truncation=True, max_length=encoder_max_length)
    outputs = tokenizer([segment["de"] for segment in batch['translation']], 
                       padding="max_length", truncation=True, max_length=encoder_max_length)


    batch["input_ids"] = inputs.input_ids
    batch["attention_mask"] = inputs.attention_mask
    batch["decoder_input_ids"] = outputs.input_ids
    batch["decoder_attention_mask"] = outputs.attention_mask
    batch["labels"] = outputs.input_ids.copy()

    # because BERT automatically shifts the labels, the labels correspond exactly to `decoder_input_ids`. 
    # We have to make sure that the PAD token is ignored
    batch["labels"] = [[-100 if token == tokenizer.pad_token_id else token for token in labels] for labels in batch["labels"]]
    return batch


def munge_dataset_to_pacify_bert(dataset, encoder_max_length=512, decoder_max_length=512, batch_size=2):
    bert_wants_to_see = ["input_ids", "attention_mask", "decoder_input_ids", 
                         "decoder_attention_mask", "labels"]
    
    _process_data_to_model_inputs = partial(process_data_to_model_inputs, 
                                                encoder_max_length=encoder_max_length, 
                                                decoder_max_length=decoder_max_length, 
                                                batch_size=batch_size
                                           )
    dataset = dataset.map(_process_data_to_model_inputs, 
                           batched=True, 
                           batch_size=batch_size
                          )
    dataset.set_format(type="torch", columns=bert_wants_to_see)
    return dataset

train_data = munge_dataset_to_pacify_bert(train_data)
val_data = munge_dataset_to_pacify_bert(val_data)

Then the training can be done easily as such:

from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments


# set training arguments - these params are not really tuned, feel free to change
training_args = Seq2SeqTrainingArguments(
    output_dir="./",
    evaluation_strategy="steps",
    ...
)


# instantiate trainer
trainer = Seq2SeqTrainer(
    model=multibert,
    tokenizer=tokenizer,
    args=training_args,
    train_dataset=train_data,
    eval_dataset=val_data,
)

trainer.train()

A working example can be found on something like: Neural Plasticity - Bert2Bert on WMT14 | Kaggle

However, parallel data used to an EncoderDecoderModel usually exists as .txt or .tsv files, not a pre-coded dataset

Given a large .tsv file (e.g. 1 billion lines), e.g.

hello world\tHallo Welt
how are you?\twie gehts?
...\t...

Step 1: we can convert into the parquet / pyarrow format, one can do something like:

import vaex  # Using vaex 
import sys

filename = "train.en-de.tsv"

df = vaex.from_csv(filename, sep="\t", header=None, names=["src", "trg"], convert=True, chunk_size=50_000_000)

df.export(f"{filename}.parquet")

Step 2: Then we will can read it into a Pyarrow table to fit into the datasets.Dataset object and use the munge_dataset_to_pacify_bert() as shown above, e.g

from datasets import Dataset, load_from_disk
import pyarrow as pa

_ds = Dataset(pa.compute.drop_null(pa.parquet.read_table('train.en-de.tsv.parquet')
_ds.save_to_disk('train.en-de.tsv.parquet.hfdataset')

_ds = load_from_disk('train.en-de.tsv.parquet.hfdataset')

train_data = munge_dataset_to_pacify_bert(_ds)

train_data.save_to_disk('train.en-de.tsv.parquet.hfdataset')

While the process above works well for small-ish dataset, e.g. 1-5 million lines of data, when the scale of the goes to 500 million to 1 billion, it seems like the last .save_to_disk() function is no where in sight.

Breaking down the steps in the munge_dataset_to_pacify_bert(), there are 2 sub-functions:

  • dataset.map(_process_data_to_model_inputs, batched=True, batch_size=batch_size)
  • dataset.set_format(type="torch", columns=bert_wants_to_see)

For the .map() process, it’s possible to scale in parallel threads by specifying by

dataset.map(_process_data_to_model_inputs, 
    batched=True, batch_size=batch_size, 
    num_proc=32  # num of parallel threads.
    )

And when I tried to process with

  • num_proc=32
  • batch_size=100

The .map() function finishes the processing of 500 million lines in 18 hours of compute time on Intel Xeon E5-2686 @ 2.3GHz with 32 processor cores, optimally.

But somehow the .map() function created 32 temp .arrow files and 128 tmp... binary files. Seemingly the last save_to_disk function has been running for more than 10+ hours and have not finished combining the temp files in parts to save the final HF Dataset to disk.


Given the above context, my questions in parts are:

Question (Part 1): When the mapping function ends and created the temp .arrow and tmp... files, is there a way to read these individually instead of try to save them into a final directory using the save_to_disk() function?


Question (Part 2): Why is the save_to_disk() function so slow after the mapping and how can the mapped processed data be saved in a faster manner?


Question (Part 3): Is there a way to avoid the .set_format() function after the .map() and make it part of the _process_data_to_model_inputs function?


Also asked on python - How to efficiently convert a large parallel corpus to a Huggingface dataset to train an EncoderDecoderModel? - Stack Overflow

Bumping the no. of hours the save_to_disk, 42 hours later, it’s still trying to save…

Killed the process after it ran for >60 hours, now I’m trying to rerun all the steps with some time profiling, with a 500M lines parquet file.

# Reading a parquet file and drop some empty lines, if any.
# Took ~415 secs, to load a parquet, drop null and resave it.
_ds = Dataset(pa.compute.drop_null(pa.parquet.read_table('train.en-de.tsv.parquet')
_ds.save_to_disk('train.en-de.tsv.parquet.hfdataset-1')
# Reloading the dataset
# Took ~524 secs
_ds = load_from_disk('train.en-de.tsv.parquet.hfdataset-1')

Now, I’m re-running the munge_dataset_to_pacify_bert with .map() without the set_format() and timing it:

# Running the .map() without .set_format()
def munge_dataset_to_pacify_bert(dataset, encoder_max_length=512, decoder_max_length=512, batch_size=2):
    _process_data_to_model_inputs = partial(process_data_to_model_inputs, 
                                                encoder_max_length=encoder_max_length, 
                                                decoder_max_length=decoder_max_length, 
                                                batch_size=batch_size
                                           )
    dataset = dataset.map(_process_data_to_model_inputs, 
                           batched=True, 
                           batch_size=batch_size
                          )
    return dataset

# Took ??? secs, will update the number after it's finished.
train_data = process_dataset(_ds)

# Saving the processed data. 
# Took ??? secs.
train_data.save("train.en-de.tsv.parquet.hfdataset-2")

# Doing the set_format
# Took ??? secs.
bert_wants_to_see = ["input_ids", "attention_mask", "decoder_input_ids", 
                         "decoder_attention_mask", "labels"]
train_data = train_data.set_format(type="torch", columns=bert_wants_to_see)


# Saving the final dataset.
# Took ??? secs
train_data.save("train.en-de.tsv.parquet.hfdataset-3")

Hi ! datasets can read TSV files, so if you create a directory data/ containing your TSV file you can do

from datasets import load_dataset
ds = load_dataset("path/to/data")
ds = munge_dataset_to_pacify_bert(ds)

and the next time your re-run this code, it will reload from the cache.

Also note that running set_format is instantaneous - it simply changes the output format of the dataset, not the data itself.

Thanks for the note on the tsv file!

I’ll try to re-profile the timings again, to compare:

  • Read tsv → munge_dataset_to_pacify_bert
  • Vaex → read parquet → munge_dataset_to_pacify_bert
  • Read tsv → munge_dataset_to_pacify_bert → set_format → save
  • Vaex → read parquet → munge_dataset_to_pacify_bert → set_format → save
  • Read tsv → munge_dataset_to_pacify_bert → save
  • Vaex → read parquet → munge_dataset_to_pacify_bert → save

After

ds = munge_dataset_to_pacify_bert(ds)

should there be another .save() operation?

ds = munge_dataset_to_pacify_bert(ds)
ds.save("path/to/data")

The save_to_disk operation is single processed, so yea it can be much slower that map that works in parallel.

After more experiments and profiling, I am giving up on using the datasets and directly using DataPipe to connect it to Huggingface Trainer, e.g. PyTorch DataPipe + HuggingFace Trainer | Kaggle

If anyone knows a better way to handle large datasets, please do let me know too. Thank you in advance!

Not that you can also load your dataset in streaming mode if you pass streaming=True to load_dataset. You can use the same map functions you used already, but everything will be computed on-the-fly like a torch DataPipe.

This will save you a lot of time and disk space :wink:

Thank you for the streaming=True tip!!

That really saved TBs of data that would otherwise be mapped and temporarily saved, given my dataset =)

I’m not sure why all my posts were “flagged as spam by community”, so instead of sharing the kaggle link, here’s a working example for other future readers in markdown:

from collections import OrderedDict

import torch
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper

from transformers import EncoderDecoderModel
from transformers import PreTrainedTokenizerFast

multibert = EncoderDecoderModel.from_encoder_decoder_pretrained(
    "bert-base-multilingual-uncased", "bert-base-multilingual-uncased"
)


​
tokenizer = PreTrainedTokenizerFast.from_pretrained("bert-base-multilingual-uncased")
​
tokenizer.bos_token = tokenizer.cls_token
tokenizer.eos_token = tokenizer.sep_token
tokenizer.add_special_tokens({'pad_token': '[PAD]'})

# set special tokens
multibert.config.decoder_start_token_id = tokenizer.bos_token_id
multibert.config.eos_token_id = tokenizer.eos_token_id
multibert.config.pad_token_id = tokenizer.pad_token_id

# sensible parameters for beam search
multibert.config.vocab_size = multibert.config.decoder.vocab_size
multibert.config.max_length = 142
multibert.config.min_length = 56
multibert.config.no_repeat_ngram_size = 3
multibert.config.early_stopping = True
multibert.config.length_penalty = 2.0
multibert.config.num_beams = 4

from functools import partial

bert_wants_to_see = ["input_ids", "attention_mask", "decoder_input_ids", 
                     "decoder_attention_mask", "labels"]

def process_data_to_model_inputs(batch, encoder_max_length=512, decoder_max_length=512): 
    inputs = tokenizer(batch["SRC"], padding="max_length",
                       truncation=True, max_length=encoder_max_length)
    outputs = tokenizer(batch["TRG"], padding="max_length", 
                        truncation=True, max_length=decoder_max_length)


    batch["input_ids"] = inputs.input_ids
    batch["attention_mask"] = inputs.attention_mask
    batch["decoder_input_ids"] = outputs.input_ids
    batch["decoder_attention_mask"] = outputs.attention_mask
    batch["labels"] = outputs.input_ids.copy()

    # because BERT automatically shifts the labels, the labels correspond exactly to `decoder_input_ids`. 
    # We have to make sure that the PAD token is ignored
    batch["labels"] = [[-100 if token == tokenizer.pad_token_id else token for token in labels] for labels in batch["labels"]]
    
    return {k:v for k,v in batch.items() if k in bert_wants_to_see}


def munge_dataset_to_pacify_bert(dataset, encoder_max_length=512, decoder_max_length=512):

    
    _process_data_to_model_inputs = partial(process_data_to_model_inputs, 
                                                encoder_max_length=encoder_max_length, 
                                                decoder_max_length=decoder_max_length, 
                                           )
    dataset = dataset.map(_process_data_to_model_inputs, 
                           batched=True)
    ##dataset.set_format(type="torch", columns=bert_wants_to_see)
    return dataset

from datasets import load_dataset

# tatoeba-sentpairs.tsv is a pretty large file.
ds = load_dataset("csv", data_files="../input/tatoeba/tatoeba-sentpairs.tsv", 
                  streaming=True, delimiter="\t", split="train")

train_data = munge_dataset_to_pacify_bert(ds)

flores_ende = load_dataset("facebook/flores", "eng_Latn-deu_Latn", streaming=True, 
                          split="dev")

flores_ende = flores_ende.rename_column('sentence_eng_Latn', 'SRC')
flores_ende = flores_ende.rename_column('sentence_deu_Latn', 'TRG')

valid_data = flores_ende.map(munge_dataset_to_pacify_bert)

from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments

import os
os.environ["WANDB_DISABLED"] = "true"

batch_size = 1

# set training arguments - these params are not really tuned, feel free to change
training_args = Seq2SeqTrainingArguments(
    output_dir="./",
    evaluation_strategy="steps",
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    predict_with_generate=True,
    logging_steps=2,  # set to 1000 for full training
    save_steps=16,    # set to 500 for full training
    eval_steps=4,     # set to 8000 for full training
    warmup_steps=1,   # set to 2000 for full training
    max_steps=16,     # delete for full training
    # overwrite_output_dir=True,
    save_total_limit=1,
    #fp16=True, 
)


# instantiate trainer
trainer = Seq2SeqTrainer(
    model=multibert,
    tokenizer=tokenizer,
    args=training_args,
    train_dataset=IterableWrapper(train_data),
    eval_dataset=IterableWrapper(valid_data),
)

trainer.train()