Skip to content

Commit

Permalink
feat: add multi-client support
Browse files Browse the repository at this point in the history
  • Loading branch information
EverythingSuckz committed Nov 28, 2023
1 parent a6be456 commit 7c853c0
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 22 deletions.
33 changes: 31 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
<ul>
<li><a href="#optional-vars">Optional environment variables</a></li>
</ul>
<ul>
<li><a href="#use-multiple-bots-to-speed-up">Using multiple bots</a></li>
</ul>
</li>
<li><a href="#contributing">Contributing</a></li>
<li><a href="#contact-me">Contact me</a></li>
Expand Down Expand Up @@ -73,10 +76,13 @@ An example of `.env` file:
```sh
API_ID=452525
API_HASH=esx576f8738x883f3sfzx83
BOT_TOKEN=55838383:yourtbottokenhere
BOT_TOKEN=55838383:yourbottokenhere
BIN_CHANNEL=-10045145224562
PORT=8080
HOST=http://yourserverip
# (if you want to set up multiple bots)
MULTI_TOKEN1=55838373:yourworkerbottokenhere
MULTI_TOKEN2=55838355:yourworkerbottokenhere
```

### Required Vars
Expand All @@ -97,7 +103,30 @@ In addition to the mandatory variables, you can also set the following optional

- `HOST` : A Fully Qualified Domain Name if present or use your server IP. (eg. `https://example.com` or `http://14.1.154.2:8080`)

- `HASH_LENGTH` : This is the custom hash length for generated URLs. The hash length must be greater than 5 and less than or equal to 32. The default value is 6.
- `HASH_LENGTH` : Custom hash length for generated URLs. The hash length must be greater than 5 and less than or equal to 32. The default value is 6.

- `USE_SESSION_FILE` : Use session files for worker client(s). This speeds up the worker bot startups. (default: `false`)

### Use Multiple Bots to speed up

> **Note**
> What it multi-client feature and what it does? <br>
> This feature shares the Telegram API requests between worker bots to speed up download speed when many users are using the server and to avoid the flood limits that are set by Telegram. <br>
> **Note**
> You can add up to 50 bots since 50 is the max amount of bot admins you can set in a Telegram Channel.
To enable multi-client, generate new bot tokens and add it as your `.env` with the following key names.

`MULTI_TOKEN1`: Add your first bot token here.

`MULTI_TOKEN2`: Add your second bot token here.

you may also add as many as bots you want. (max limit is 50)
`MULTI_TOKEN3`, `MULTI_TOKEN4`, etc.

> **Warning**
> Don't forget to add all these worker bots to the `BIN_CHANNEL` for the proper functioning
## Contributing

Expand Down
2 changes: 1 addition & 1 deletion bot/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func StartClient(log *zap.Logger) (*gotgproto.Client, error) {
BotToken: config.ValueOf.BotToken,
},
&gotgproto.ClientOpts{
Session: sessionMaker.NewSession("fsb", sessionMaker.Session),
Session: sessionMaker.SqliteSession("fsb"),
DisableCopyright: true,
},
)
Expand Down
144 changes: 144 additions & 0 deletions bot/workers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package bot

import (
"EverythingSuckz/fsb/config"
"fmt"
"os"
"path/filepath"
"sync"

"github.com/celestix/gotgproto"
"github.com/celestix/gotgproto/sessionMaker"
"github.com/gotd/td/tg"
"go.uber.org/zap"
)

type Worker struct {
ID int
Client *gotgproto.Client
Self *tg.User
log *zap.Logger
}

func (w *Worker) String() string {
return fmt.Sprintf("{Worker (%d|@%s)}", w.ID, w.Self.Username)
}

type BotWorkers struct {
Bots []*Worker
starting int
index int
mut sync.Mutex
log *zap.Logger
}

var Workers *BotWorkers = &BotWorkers{
log: nil,
Bots: make([]*Worker, 0),
}

func (w *BotWorkers) Init(log *zap.Logger) {
w.log = log.Named("Workers")
}

func (w *BotWorkers) AddDefaultClient(client *gotgproto.Client, self *tg.User) {
if w.Bots == nil {
w.Bots = make([]*Worker, 0)
}
w.incStarting()
w.Bots = append(w.Bots, &Worker{
Client: client,
ID: w.starting,
Self: self,
})
w.log.Sugar().Info("Default bot loaded")
}

func (w *BotWorkers) incStarting() {
w.mut.Lock()
defer w.mut.Unlock()
w.starting++
}

func (w *BotWorkers) Add(token string) (err error) {
w.incStarting()
var botID int = w.starting
client, err := startWorker(w.log, token, botID)
if err != nil {
return err
}
w.log.Sugar().Infof("Bot @%s loaded with ID %d", client.Self.Username, botID)
w.Bots = append(w.Bots, &Worker{
Client: client,
ID: botID,
Self: client.Self,
log: w.log,
})
return nil
}

func GetNextWorker() *Worker {
Workers.mut.Lock()
defer Workers.mut.Unlock()
index := (Workers.index + 1) % len(Workers.Bots)
Workers.index = index
worker := Workers.Bots[index]
Workers.log.Sugar().Infof("Using worker %d", worker.ID)
return worker
}

func StartWorkers(log *zap.Logger) {
log.Sugar().Info("Starting workers")
Workers.Init(log)
if config.ValueOf.UseSessionFile {
log.Sugar().Info("Using session file for workers")
newpath := filepath.Join(".", "sessions")
err := os.MkdirAll(newpath, os.ModePerm)
if err != nil {
log.Error("Failed to create sessions directory", zap.Error(err))
return
}
}
c := make(chan struct{})
for i := 0; i < len(config.ValueOf.MultiTokens); i++ {
go func(i int) {
err := Workers.Add(config.ValueOf.MultiTokens[i])
if err != nil {
log.Error("Failed to start worker", zap.Error(err))
return
}
c <- struct{}{}
}(i)
}
// wait for all workers to start
log.Sugar().Info("Waiting for all workers to start")
for i := 0; i < len(config.ValueOf.MultiTokens); i++ {
<-c
}
}

func startWorker(l *zap.Logger, botToken string, index int) (*gotgproto.Client, error) {
log := l.Named("Worker").Sugar()
log.Infof("Starting worker with index - %d", index)
var sessionType *sessionMaker.SqliteSessionConstructor
if config.ValueOf.UseSessionFile {
sessionType = sessionMaker.SqliteSession(fmt.Sprintf("sessions/worker-%d", index))
} else {
sessionType = sessionMaker.SqliteSession(":memory:")
}
client, err := gotgproto.NewClient(
int(config.ValueOf.ApiID),
config.ValueOf.ApiHash,
gotgproto.ClientType{
BotToken: botToken,
},
&gotgproto.ClientOpts{
Session: sessionType,
DisableCopyright: true,
},
)
if err != nil {
return nil, err
}
return client, nil
}
4 changes: 2 additions & 2 deletions commands/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ func sendLink(ctx *ext.Context, u *ext.Update) error {
return dispatcher.EndGroups
}
chatId := u.EffectiveChat().GetID()
peerChatId := storage.GetPeerById(chatId)
peerChatId := ctx.PeerStorage.GetPeerById(chatId)
if peerChatId.Type != int(storage.TypeUser) {
return dispatcher.EndGroups
}
peer := storage.GetPeerById(config.ValueOf.LogChannelID)
peer := ctx.PeerStorage.GetPeerById(config.ValueOf.LogChannelID)
switch storage.EntityType(peer.Type) {
case storage.TypeChat:
return dispatcher.EndGroups
Expand Down
14 changes: 14 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package config

import (
"os"
"reflect"
"regexp"
"strconv"
"strings"

Expand All @@ -20,13 +23,24 @@ type config struct {
Port int `envconfig:"PORT" default:"8080"`
Host string `envconfig:"HOST" default:"http://localhost:8080"`
HashLength int `envconfig:"HASH_LENGTH" default:"6"`
UseSessionFile bool `envconfig:"USE_SESSION_FILE" default:"true"`
MultiTokens []string
}

var botTokenRegex = regexp.MustCompile(`MULTI\_TOKEN[\d+]=(.*)`)

func (c *config) setupEnvVars() {
err := envconfig.Process("", c)
if err != nil {
panic(err)
}
val := reflect.ValueOf(c).Elem()
for _, env := range os.Environ() {
if strings.HasPrefix(env, "MULTI_TOKEN") {
c.MultiTokens = append(c.MultiTokens, botTokenRegex.FindStringSubmatch(env)[1])
}
}
val.FieldByName("MultiTokens").Set(reflect.ValueOf(c.MultiTokens))
}

func Load(log *zap.Logger) {
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module EverythingSuckz/fsb
go 1.21.3

require (
github.com/celestix/gotgproto v1.0.0-beta13
github.com/celestix/gotgproto v1.0.0-beta13.0.20231124171805-6c04fae60b80
github.com/gin-gonic/gin v1.9.1
github.com/gotd/td v0.89.0
github.com/joho/godotenv v1.5.1
Expand Down Expand Up @@ -58,9 +58,9 @@ require (
github.com/ugorji/go/codec v1.2.11 // indirect
go.uber.org/zap v1.26.0
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/celestix/gotgproto v1.0.0-beta13 h1:5BlGUJMwJmXrWD9RhBbHRuJhbPkv5CJd04x/sDCpYeg=
github.com/celestix/gotgproto v1.0.0-beta13/go.mod h1:WHwqFwgXEpFN/2ReP+vVnxCs2IvULaRK7n0N5ouVmDw=
github.com/celestix/gotgproto v1.0.0-beta13.0.20231124171805-6c04fae60b80 h1:OhekKvJhPQx7jkPVomt7rD8tAlnel4l/sR6X7D9Pkpw=
github.com/celestix/gotgproto v1.0.0-beta13.0.20231124171805-6c04fae60b80/go.mod h1:sPTsFAhN6apWcxCLc07LFEkb9wuoQa9L7JxXl6znLY4=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
Expand Down Expand Up @@ -124,17 +126,23 @@ golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20230116083435-1de6713980de h1:DBWn//IJw30uYCgERoxCg84hWtA97F4wMiKOIh00Uf0=
golang.org/x/exp v0.0.0-20230116083435-1de6713980de/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg=
golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ func main() {
mainLogger.Info("Starting server")
config.Load(log)
router := getRouter(log)

_, err := bot.StartClient(log)
if err != nil {
log.Info(err.Error())
return
}
cache.InitCache(log)
bot.StartWorkers(log)
mainLogger.Info("Server started", zap.Int("port", config.ValueOf.Port))
mainLogger.Info("File Stream Bot", zap.String("version", versionString))
err = router.Run(fmt.Sprintf(":%d", config.ValueOf.Port))
Expand Down
6 changes: 4 additions & 2 deletions routes/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func getStreamRoute(ctx *gin.Context) {
var start, end int64
rangeHeader := r.Header.Get("Range")

file, err := utils.FileFromMessage(ctx, bot.Bot.Client, messageID)
worker := bot.GetNextWorker()

file, err := utils.FileFromMessage(ctx, worker.Client, messageID)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down Expand Up @@ -100,7 +102,7 @@ func getStreamRoute(ctx *gin.Context) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
lr, _ := utils.NewTelegramReader(ctx, bot.Bot.Client, file.Location, start, end, contentLength)
lr, _ := utils.NewTelegramReader(ctx, worker.Client, file.Location, start, end, contentLength)
if _, err := io.CopyN(w, lr, contentLength); err != nil {
log.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).Error(err.Error())
}
Expand Down
15 changes: 8 additions & 7 deletions utils/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"errors"
"fmt"

"github.com/gotd/td/telegram"
"github.com/celestix/gotgproto"
"github.com/gotd/td/tg"
"go.uber.org/zap"
)

func GetTGMessage(ctx context.Context, client *telegram.Client, messageID int) (*tg.Message, error) {
func GetTGMessage(ctx context.Context, client *gotgproto.Client, messageID int) (*tg.Message, error) {
inputMessageID := tg.InputMessageClass(&tg.InputMessageID{ID: messageID})
channel, err := GetChannelById(ctx, client)
if err != nil {
Expand Down Expand Up @@ -58,16 +59,16 @@ func FileFromMedia(media tg.MessageMediaClass) (*types.File, error) {
return nil, fmt.Errorf("unexpected type %T", media)
}

func FileFromMessage(ctx context.Context, client *telegram.Client, messageID int) (*types.File, error) {
key := fmt.Sprintf("file:%d", messageID)
func FileFromMessage(ctx context.Context, client *gotgproto.Client, messageID int) (*types.File, error) {
key := fmt.Sprintf("file:%d:%d", messageID, client.Self.ID)
log := Logger.Named("GetMessageMedia")
var cachedMedia types.File
err := cache.GetCache().Get(key, &cachedMedia)
if err == nil {
log.Sugar().Debug("Using cached media message properties")
log.Debug("Using cached media message properties", zap.Int("messageID", messageID), zap.Int64("clientID", client.Self.ID))
return &cachedMedia, nil
}
log.Sugar().Debug("Fetching file properties from message ID")
log.Debug("Fetching file properties from message ID", zap.Int("messageID", messageID), zap.Int64("clientID", client.Self.ID))
message, err := GetTGMessage(ctx, client, messageID)
if err != nil {
return nil, err
Expand All @@ -88,7 +89,7 @@ func FileFromMessage(ctx context.Context, client *telegram.Client, messageID int
// TODO: add photo support
}

func GetChannelById(ctx context.Context, client *telegram.Client) (*tg.InputChannel, error) {
func GetChannelById(ctx context.Context, client *gotgproto.Client) (*tg.InputChannel, error) {
channel := &tg.InputChannel{}
inputChannel := &tg.InputChannel{
ChannelID: config.ValueOf.LogChannelID,
Expand Down
Loading

0 comments on commit 7c853c0

Please sign in to comment.