forked from mrgigabyte/proxybot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
webhook_proxy_bot.py
181 lines (149 loc) · 7.71 KB
/
webhook_proxy_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# This is the simpliest Proxy Bot implementation with Webhooks instead of Long Polling
# Here we use self-signed certificate.
#
# Quick'n'dirty Self-Signed SSL certificate generation:
#
# openssl genrsa -out webhook_pkey.pem 2048
# openssl req -new -x509 -days 3650 -key webhook_pkey.pem -out webhook_cert.pem
#
# When asked for "Common Name (e.g. server FQDN or YOUR name)" you should reply
# with the same value in you put in WEBHOOK_HOST
import telebot
import config
import dbhelper
import cherrypy
WEBHOOK_HOST = 'Your.Ip.Address.Here'
WEBHOOK_PORT = 443 # 443, 80, 88 or 8443 (port need to be 'open')
WEBHOOK_LISTEN = '0.0.0.0' # In some VPS/VDS you may need to put here the IP addr
WEBHOOK_SSL_CERT = 'my_cert.pem' # Path to the ssl certificate
WEBHOOK_SSL_PRIV = 'my_pkey.pem' # Path to the ssl private key
WEBHOOK_URL_BASE = "https://{!s}:{!s}".format(WEBHOOK_HOST, WEBHOOK_PORT)
WEBHOOK_URL_PATH = "/{!s}/".format(config.token)
# Initialize bot
bot = telebot.TeleBot(config.token)
# Handle always first "/start" message when new chat with your bot is created
@bot.message_handler(commands=["start"])
def command_start(message):
bot.send_message(message.chat.id, "Hello! Now please write your message to forward it to my owner!")
@bot.message_handler(func=lambda message: message.chat.id == config.my_id, content_types=["text"])
def my_text(message):
# If we're just sending messages to bot (not replying) -> do nothing and notify about it.
# Else -> get ID whom to reply and send message FROM bot.
if message.reply_to_message:
who_to_send_id = dbhelper.get_user_id(message.reply_to_message.message_id)
if who_to_send_id:
# You can add parse_mode="Markdown" or parse_mode="HTML", however, in this case you MUST make sure,
# that your markup if well-formed as described here: https://core.telegram.org/bots/api#formatting-options
# Otherwise, your message won't be sent.
bot.send_message(who_to_send_id, message.text)
else:
bot.send_message(message.chat.id, "No one to reply!")
@bot.message_handler(func=lambda message: message.chat.id == config.my_id, content_types=["sticker"])
def my_sticker(message):
if message.reply_to_message:
who_to_send_id = dbhelper.get_user_id(message.reply_to_message.message_id)
if who_to_send_id:
bot.send_sticker(who_to_send_id, message.sticker.file_id)
else:
bot.send_message(message.chat.id, "No one to reply!")
@bot.message_handler(func=lambda message: message.chat.id == config.my_id, content_types=["photo"])
def my_photo(message):
if message.reply_to_message:
who_to_send_id = dbhelper.get_user_id(message.reply_to_message.message_id)
if who_to_send_id:
# Send the largest available (last item in photos array)
bot.send_photo(who_to_send_id, list(message.photo)[-1].file_id)
else:
bot.send_message(message.chat.id, "No one to reply!")
@bot.message_handler(func=lambda message: message.chat.id == config.my_id, content_types=["voice"])
def my_voice(message):
if message.reply_to_message:
who_to_send_id = dbhelper.get_user_id(message.reply_to_message.message_id)
if who_to_send_id:
# bot.send_chat_action(who_to_send_id, "record_audio")
bot.send_voice(who_to_send_id, message.voice.file_id, duration=message.voice.duration)
else:
bot.send_message(message.chat.id, "No one to reply!")
@bot.message_handler(func=lambda message: message.chat.id == config.my_id, content_types=["document"])
def my_document(message):
if message.reply_to_message:
who_to_send_id = dbhelper.get_user_id(message.reply_to_message.message_id)
if who_to_send_id:
# bot.send_chat_action(who_to_send_id, "upload_document")
bot.send_document(who_to_send_id, data=message.document.file_id)
else:
bot.send_message(message.chat.id, "No one to reply!")
@bot.message_handler(func=lambda message: message.chat.id == config.my_id, content_types=["audio"])
def my_audio(message):
if message.reply_to_message:
who_to_send_id = dbhelper.get_user_id(message.reply_to_message.message_id)
if who_to_send_id:
# bot.send_chat_action(who_to_send_id, "upload_audio")
bot.send_audio(who_to_send_id, performer=message.audio.performer,
audio=message.audio.file_id, title=message.audio.title,
duration=message.audio.duration)
else:
bot.send_message(message.chat.id, "No one to reply!")
@bot.message_handler(func=lambda message: message.chat.id == config.my_id, content_types=["video"])
def my_video(message):
if message.reply_to_message:
who_to_send_id = dbhelper.get_user_id(message.reply_to_message.message_id)
if who_to_send_id:
# bot.send_chat_action(who_to_send_id, "upload_video")
bot.send_video(who_to_send_id, data=message.video.file_id, duration=message.video.duration)
else:
bot.send_message(message.chat.id, "No one to reply!")
# No Google Maps on my phone, so this function is untested, should work fine though.
@bot.message_handler(func=lambda message: message.chat.id == config.my_id, content_types=["location"])
def my_location(message):
if message.reply_to_message:
who_to_send_id = dbhelper.get_user_id(message.reply_to_message.message_id)
if who_to_send_id:
# bot.send_chat_action(who_to_send_id, "find_location")
bot.send_location(who_to_send_id, latitude=message.location.latitude, longitude=message.location.longitude)
else:
bot.send_message(message.chat.id, "No one to reply!")
# Handle all incoming messages except group ones
@bot.message_handler(func=lambda message: message.chat.id != config.my_id,
content_types=['text', 'audio', 'document', 'photo', 'sticker', 'video',
'voice', 'location', 'contact'])
def check(message):
# Forward all messages from other people and save their message_id + 1 to shelve storage.
# +1, because message_id = X for message FROM user TO bot and
# message_id = X+1 for message FROM bot TO you
bot.forward_message(config.my_id, message.chat.id, message.message_id)
dbhelper.add_message(message.message_id + 1, message.chat.id)
# WebhookServer, process webhook calls
class WebhookServer(object):
@cherrypy.expose
def index(self):
if 'content-length' in cherrypy.request.headers and \
'content-type' in cherrypy.request.headers and \
cherrypy.request.headers['content-type'] == 'application/json':
length = int(cherrypy.request.headers['content-length'])
json_string = cherrypy.request.body.read(length)
json_string = json_string.decode("utf-8")
update = telebot.types.Update.de_json(json_string)
if update.message:
bot.process_new_messages([update.message])
if update.inline_query:
bot.process_new_inline_query([update.inline_query])
return ''
else:
raise cherrypy.HTTPError(403)
if __name__ == '__main__':
# Remove webhook, it fails sometimes the set if there is a previous webhook
bot.remove_webhook()
# Set webhook
bot.set_webhook(url=WEBHOOK_URL_BASE+WEBHOOK_URL_PATH,
certificate=open(WEBHOOK_SSL_CERT, 'r'))
cherrypy.config.update({
'server.socket_host': WEBHOOK_LISTEN,
'server.socket_port': WEBHOOK_PORT,
'server.ssl_module': 'builtin',
'server.ssl_certificate': WEBHOOK_SSL_CERT,
'server.ssl_private_key': WEBHOOK_SSL_PRIV
})
cherrypy.quickstart(WebhookServer(), WEBHOOK_URL_PATH, {'/': {}})