| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| from pathlib import Path |
|
|
| import k2 |
| import sentencepiece as spm |
| import torch |
|
|
| from icefall.lexicon import Lexicon |
| from icefall.utils import parse_bpe_timestamps_and_texts, parse_timestamps_and_texts |
|
|
| ICEFALL_DIR = Path(__file__).resolve().parent.parent |
|
|
|
|
| def test_parse_bpe_timestamps_and_texts(): |
| lang_dir = ICEFALL_DIR / "egs/librispeech/ASR/data/lang_bpe_500" |
| if not lang_dir.is_dir(): |
| print(f"{lang_dir} does not exist.") |
| return |
|
|
| sp = spm.SentencePieceProcessor() |
| sp.load(str(lang_dir / "bpe.model")) |
|
|
| text_1 = "HELLO WORLD" |
| token_ids_1 = sp.encode(text_1, out_type=int) |
| |
| |
|
|
| |
| labels_1 = ( |
| token_ids_1[0:1] * 2 |
| + token_ids_1[1:3] |
| + [0] * 2 |
| + token_ids_1[3:4] * 3 |
| + [0] * 2 |
| ) |
| |
| aux_labels_1 = ( |
| token_ids_1[0:1] |
| + [0] |
| + token_ids_1[1:3] |
| + [0] * 2 |
| + token_ids_1[3:4] |
| + [0] * 4 |
| + [-1] |
| ) |
| fsa_1 = k2.linear_fsa(labels_1) |
| fsa_1.aux_labels = torch.tensor(aux_labels_1).to(torch.int32) |
|
|
| text_2 = "SAY GOODBYE" |
| token_ids_2 = sp.encode(text_2, out_type=int) |
| |
| |
|
|
| |
| labels_2 = ( |
| token_ids_2[0:1] + [0] * 2 + token_ids_2[1:2] * 2 + token_ids_2[2:5] + [0] * 2 |
| ) |
| |
| aux_labels_2 = ( |
| token_ids_2[0:1] |
| + [0] * 2 |
| + token_ids_2[1:2] |
| + [0] |
| + token_ids_2[2:5] |
| + [0] * 2 |
| + [-1] |
| ) |
| fsa_2 = k2.linear_fsa(labels_2) |
| fsa_2.aux_labels = torch.tensor(aux_labels_2).to(torch.int32) |
|
|
| fsa_vec = k2.create_fsa_vec([fsa_1, fsa_2]) |
|
|
| utt_index_pairs, utt_words = parse_bpe_timestamps_and_texts(fsa_vec, sp) |
| assert utt_index_pairs[0] == [(0, 3), (6, 8)], utt_index_pairs[0] |
| assert utt_words[0] == ["HELLO", "WORLD"], utt_words[0] |
| assert utt_index_pairs[1] == [(0, 0), (3, 7)], utt_index_pairs[1] |
| assert utt_words[1] == ["SAY", "GOODBYE"], utt_words[1] |
|
|
|
|
| def test_parse_timestamps_and_texts(): |
| lang_dir = ICEFALL_DIR / "egs/librispeech/ASR/data/lang_bpe_500" |
| if not lang_dir.is_dir(): |
| print(f"{lang_dir} does not exist.") |
| return |
|
|
| lexicon = Lexicon(lang_dir) |
|
|
| sp = spm.SentencePieceProcessor() |
| sp.load(str(lang_dir / "bpe.model")) |
| word_table = lexicon.word_table |
|
|
| text_1 = "HELLO WORLD" |
| token_ids_1 = sp.encode(text_1, out_type=int) |
| |
| |
| word_ids_1 = [word_table[s] for s in text_1.split()] |
| |
| labels_1 = ( |
| token_ids_1[0:1] * 2 |
| + token_ids_1[1:3] |
| + [0] * 2 |
| + token_ids_1[3:4] * 3 |
| + [0] * 2 |
| ) |
| |
| aux_labels_1 = [word_ids_1[0:1]] + [[]] * 5 + [word_ids_1[1:2]] + [[]] * 5 |
|
|
| fsa_1 = k2.linear_fsa(labels_1) |
| fsa_1.aux_labels = k2.RaggedTensor(aux_labels_1) |
|
|
| text_2 = "SAY GOODBYE" |
| token_ids_2 = sp.encode(text_2, out_type=int) |
| |
| |
| word_ids_2 = [word_table[s] for s in text_2.split()] |
| |
| labels_2 = ( |
| token_ids_2[0:1] + [0] * 2 + token_ids_2[1:2] * 2 + token_ids_2[2:5] + [0] * 2 |
| ) |
| |
| aux_labels_2 = [word_ids_2[0:1]] + [[]] * 2 + [word_ids_2[1:2]] + [[]] * 7 |
|
|
| fsa_2 = k2.linear_fsa(labels_2) |
| fsa_2.aux_labels = k2.RaggedTensor(aux_labels_2) |
|
|
| fsa_vec = k2.create_fsa_vec([fsa_1, fsa_2]) |
|
|
| utt_index_pairs, utt_words = parse_timestamps_and_texts(fsa_vec, word_table) |
| assert utt_index_pairs[0] == [(0, 3), (6, 8)], utt_index_pairs[0] |
| assert utt_words[0] == ["HELLO", "WORLD"], utt_words[0] |
| assert utt_index_pairs[1] == [(0, 0), (3, 7)], utt_index_pairs[1] |
| assert utt_words[1] == ["SAY", "GOODBYE"], utt_words[1] |
|
|
|
|
| if __name__ == "__main__": |
| test_parse_bpe_timestamps_and_texts() |
| test_parse_timestamps_and_texts() |
|
|