-
Notifications
You must be signed in to change notification settings - Fork 0
/
custom_dataset.py
106 lines (87 loc) · 3.63 KB
/
custom_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from typing import NamedTuple, Callable
import jax.numpy as xp
# dataset = "play"
base_path = Path("/Data/nlp/")
def load(dataset: str = "english"):
path = base_path / dataset
books = [f for f in os.listdir(path) if f.endswith('.txt')]
def read_book_file(filename: str):
print(f"reading {filename}...")
try:
with open(path / filename, 'r', encoding='utf-8') as f:
return f.read()
except UnicodeDecodeError:
try:
with open(path / filename, 'r', encoding='gb2312') as f:
return f.read()
except UnicodeDecodeError:
try:
with open(path / filename, 'r', encoding='gbk') as f:
return f.read()
except UnicodeDecodeError:
try:
with open(path / filename, 'r', encoding='big5') as f:
return f.read()
except UnicodeDecodeError:
try:
with open(path / filename, 'r', encoding='utf-16') as f:
return f.read()
except UnicodeDecodeError:
try:
with open(path / filename, 'r', encoding='gb18030') as f:
return f.read()
except UnicodeDecodeError:
raise Exception(f"Failed to read {filename} with many encodings")
text = "\n\n".join(f"{book_name}\n\n {read_book_file(book_name)}" for book_name in books)
chars = [ch for ch, c in Counter(text).most_common()]
vocab_size = len(chars)
stoi = {c: i for i, c in enumerate(chars)}
itos = {i: c for i, c in enumerate(chars)}
def encode(_text: str):
return [stoi[c] for c in _text]
def decode(_encoded: list):
return "".join(itos[i] for i in _encoded)
return text, encode, decode, vocab_size
def load_jax_cached(dataset: str = "english"):
text, encode, decode, vocab_size = load(dataset)
cache_path = base_path / dataset / 'encoded_jax.npy'
try:
with open(cache_path, 'rb') as f:
encoded_jax = xp.load(f)
except FileNotFoundError:
encoded = encode(text)
encoded_jax = xp.array(encoded, dtype=xp.int16)
print(encoded_jax.shape, encoded_jax.dtype)
with open(cache_path, 'wb') as fw:
xp.save(fw, encoded_jax)
return encoded_jax, encode, decode, vocab_size
@dataclass
class Tokens:
ids: list[int]
class Tokenizer(NamedTuple):
vocab_size: int
encode_: Callable[[str], Tokens]
decode_: Callable[[list[int]], str]
def encode(self, text: str) -> Tokens:
return self.encode_(text)
def decode(self, tokens: list[int]) -> str:
return self.decode_(tokens)
def get_vocab_size(self) -> int:
return self.vocab_size
def load_jax_cached_tokenizer(base_path: Path, dataset: str = "english") -> tuple[xp.ndarray, Tokenizer]:
text, encode, decode, vocab_size = load(dataset)
cache_path = base_path / dataset / 'encoded_jax.npy'
try:
with open(cache_path, 'rb') as f:
encoded_jax = xp.load(f)
except FileNotFoundError:
encoded = encode(text)
encoded_jax = xp.array(encoded, dtype=xp.int16)
print(encoded_jax.shape, encoded_jax.dtype)
with open(cache_path, 'wb') as fw:
xp.save(fw, encoded_jax)
return encoded_jax, Tokenizer(vocab_size, lambda x: Tokens(encode(x)), decode)