-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathvoicevox.py
147 lines (115 loc) · 4.92 KB
/
voicevox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import requests
import os
from tempfile import NamedTemporaryFile
from google.cloud import storage
import subprocess
from pydub.utils import mediainfo
import langid
import google.auth.transport.requests
import google.oauth2.id_token
LINE_ACCESS_TOKEN = os.getenv('LINE_ACCESS_TOKEN')
def get_google_cloud_token(audience):
auth_req = google.auth.transport.requests.Request()
id_token = google.oauth2.id_token.fetch_id_token(auth_req, audience)
return id_token
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
# Construct public url
public_url = f"https://storage.googleapis.com/{bucket_name}/{destination_blob_name}"
#print(f"Successfully uploaded file to {public_url}")
return public_url
except Exception as e:
print(f"Failed to upload file: {e}")
raise
def convert_audio_to_m4a(input_path, output_path):
command = ['ffmpeg', '-i', input_path, '-c:a', 'aac', output_path]
result = subprocess.run(command, check=True, capture_output=True, text=True)
def text_to_speech(text, bucket_name, destination_blob_name, voicevox_url, style_id):
audience = f"{voicevox_url}/"
auth_token = get_google_cloud_token(audience)
headers = {
'Authorization': f'Bearer {auth_token}'
}
#voicevox main
query_endpoint = f"{voicevox_url}/audio_query"
synthesis_endpoint = f"{voicevox_url}/synthesis"
query_params = {
'text': text,
'speaker': style_id
}
query_response = requests.post(query_endpoint, params=query_params, headers=headers)
if query_response.status_code == 200:
query_data = query_response.json()
else:
print(f'Error: Failed to get audio query. Status Code: {query_response.status_code}, Response: {query_response.text}')
return
synthesis_body = query_data
synthesis_response = requests.post(synthesis_endpoint, json=synthesis_body, params={'speaker': style_id}, headers=headers)
if synthesis_response.status_code == 200:
with NamedTemporaryFile(suffix=".wav", delete=False) as temp:
temp.write(synthesis_response.content)
temp.flush()
# Convert the WAV file to M4A
m4a_path = temp.name.replace(".wav", ".m4a")
convert_audio_to_m4a(temp.name, m4a_path)
# Get the duration of the local file before uploading
duration = get_duration(m4a_path)
# Upload the m4a file
public_url = upload_blob(bucket_name, m4a_path, destination_blob_name)
# Return the public url, local path of the file, and duration
return public_url, m4a_path, duration
else:
print('Error: Failed to synthesize audio.')
return
def delete_local_file(file_path):
"""Deletes a local file."""
if os.path.isfile(file_path):
os.remove(file_path)
#print(f"Local file {file_path} deleted.")
#else:
#print(f"No local file found at {file_path}.")
def delete_blob(bucket_name, blob_name):
"""Deletes a blob from the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.delete()
#print(f"Blob {blob_name} deleted.")
def get_duration(file_path):
info = mediainfo(file_path)
#print(f"mediainfo: {info}")
duration = info.get('duration') # durationの値がない場合はNoneを返す
if duration is None:
print(f"No duration information found for {file_path}.")
return 0 # または適当なデフォルト値
else:
return int(float(duration)) * 1000 # Convert to milliseconds
def set_bucket_lifecycle(bucket_name, age):
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
rule = {
'action': {'type': 'Delete'},
'condition': {'age': age} # The number of days after object creation
}
bucket.lifecycle_rules = [rule]
bucket.patch()
#print(f"Lifecycle rule set for bucket {bucket_name}.")
def bucket_exists(bucket_name):
"""Check if a bucket exists."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
return bucket.exists()
def put_audio_voicevox(userId, message_id, response, BACKET_NAME, FILE_AGE, voicevox_url, style_id):
if bucket_exists(BACKET_NAME):
set_bucket_lifecycle(BACKET_NAME, FILE_AGE)
else:
print(f"Bucket {BACKET_NAME} does not exist.")
return 'OK'
blob_path = f'{userId}/{message_id}.m4a'
public_url, local_path, duration = text_to_speech(response, BACKET_NAME, blob_path, voicevox_url, style_id)
return public_url, local_path, duration