Skip to content

Commit

Permalink
A lot of more comments
Browse files Browse the repository at this point in the history
Also merge the cannot set timeout with the returning error
  • Loading branch information
HirbodBehnam committed May 29, 2020
1 parent fb19562 commit 5163cac
Showing 1 changed file with 86 additions and 42 deletions.
128 changes: 86 additions & 42 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"encoding/json"
"errors"
"flag"
"fmt"
"io"
Expand All @@ -17,45 +18,74 @@ import (
"time"
)

//More global values
// Thread safe rules
var Rules CSafeRule

// The config file name that has the default of rules.json
var ConfigFileName = "rules.json"

// Thread safe simultaneous connections
var SimultaneousConnections CSafeConnections

// Log level (higher = more logs)
var Verbose = 1

// If true that program will save the config file just before it exits
var SaveBeforeExit = true

// The version of program
const Version = "1.4.1 / Build 12"

// A struct to count the active connections on each port
type CSafeConnections struct {
SimultaneousConnections []int
mu sync.RWMutex
}

// Thread safe rules to read and write it
type CSafeRule struct {
Rules []Rule
mu sync.RWMutex
}

// The main rule struct
type Rule struct {
Name string
Listen uint16
Forward string
Quota int64
ExpireDate int64
// Name does not have any effect on anything
Name string
// The port to listen on
Listen uint16
// The destination to forward the packets to
Forward string
// The remaining bytes that the user can use
// Note that this variable is int64 not uint64. Because if it was uin64, it would overflow to 2^64-1.
Quota int64
// The last time this port can be accessed in UTC
// 0 means that this rules does not expire
ExpireDate int64
// Number of simultaneous connections allowed * 2
// 0 means that there is no limit
Simultaneous int
}

// The config file struct
type Config struct {
// Interval of saving files in seconds
SaveDuration int
Timeout int64
Rules []Rule
// The timeout of all connections
// Values equal or lower than 0 disable the timeout
Timeout int64
// All of the forwarding rules
Rules []Rule
}

//Timeout values
// Is timeout enabled?
var EnableTimeOut = true

// The timout value in time.Duration type
var TimeoutDuration time.Duration

func main() {
{ //Parse arguments
{ // Parse arguments
flag.StringVar(&ConfigFileName, "config", "rules.json", "The config filename")
flag.IntVar(&Verbose, "verbose", 1, "Verbose level: 0->None(Mostly Silent), 1->Quota reached, expiry date and typical errors, 2->Connection flood 3->Timeout drops 4->All logs and errors")
help := flag.Bool("h", false, "Show help")
Expand All @@ -77,7 +107,7 @@ func main() {
SaveBeforeExit = !SaveBeforeExit
}

//Read config file
// Read config file
var conf Config
{
confF, err := ioutil.ReadFile(ConfigFileName)
Expand All @@ -101,50 +131,50 @@ func main() {
}
}

//Start listeners
// Start listeners
for index, rule := range Rules.Rules {
go func(i int, loopRule Rule) {
if loopRule.Quota < 0 { //If the quota is already reached why listen for connections?
if loopRule.Quota < 0 { // If the quota is already reached why listen for connections?
log.Println("Skip enabling forward on port", loopRule.Listen, "because the quota is reached.")
return
}
if loopRule.ExpireDate != 0 && loopRule.ExpireDate < time.Now().Unix() {
if loopRule.ExpireDate != 0 && loopRule.ExpireDate < time.Now().Unix() { // Same thing goes with expire date
log.Println("Skip enabling forward on port", loopRule.Listen, "because this rule is expired.")
return
}

log.Println("Forwarding from", loopRule.Listen, "port to", loopRule.Forward)
ln, err := net.Listen("tcp", ":"+strconv.Itoa(int(loopRule.Listen))) //Listen on port
ln, err := net.Listen("tcp", ":"+strconv.Itoa(int(loopRule.Listen))) // Initialize the listener
if err != nil {
panic(err)
panic(err) // This will terminate the program
}

for {
conn, err := ln.Accept() //The loop will be held here
conn, err := ln.Accept() // The loop will be held here;

Rules.mu.RLock() //Lock the mutex to just read the quota
if Rules.Rules[i].Quota < 0 { //Check the quota
Rules.mu.RLock() // Lock the rules mutex to read the quota and expire date
if Rules.Rules[i].Quota < 0 { // Check the quota
Rules.mu.RUnlock()
logVerbose(1, "Quota reached for port", loopRule.Listen, "pointing to", loopRule.Forward)
if err == nil {
_ = conn.Close()
}
saveConfig(conf)
saveConfig(conf) // Force write the config file
break
}
if Rules.Rules[i].ExpireDate != 0 && Rules.Rules[i].ExpireDate < time.Now().Unix() {
if Rules.Rules[i].ExpireDate != 0 && Rules.Rules[i].ExpireDate < time.Now().Unix() { // Check expire date
Rules.mu.RUnlock()
logVerbose(1, "Expire date reached for port", loopRule.Listen, "pointing to", loopRule.Forward)
if err == nil {
_ = conn.Close()
}
saveConfig(conf)
saveConfig(conf) // Force write the config file
break
}
Rules.mu.RUnlock()

if err != nil {
println("Error on accepting connection:", err.Error())
logVerbose(1, "Error on accepting connection:", err.Error())
continue
}

Expand All @@ -153,20 +183,21 @@ func main() {
}(index, rule)
}

//Save config file
// Save config file in intervals
go func() {
sd := conf.SaveDuration
if sd == 0 {
sd = 600
conf.SaveDuration = 600
}
saveInterval := time.Duration(sd) * time.Second
for {
time.Sleep(time.Duration(sd) * time.Second) //Save file every x seconds
time.Sleep(saveInterval) // Save file every x seconds
saveConfig(conf)
}
}()

//https://gobyexample.com/signals
// https://gobyexample.com/signals
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -177,11 +208,12 @@ func main() {
log.Println("Ctrl + C to stop")
<-done
if SaveBeforeExit {
saveConfig(conf) //Save the config file one last time before exiting
saveConfig(conf) // Save the config file one last time before exiting
}
log.Println("Exiting")
}

// Saves the config file
func saveConfig(config Config) {
Rules.mu.RLock() //Lock to read the rules
config.Rules = Rules.Rules
Expand All @@ -196,8 +228,10 @@ func saveConfig(config Config) {
}
}

// All incoming connections end up here
// Index is the rule index
func handleRequest(conn net.Conn, index int, r Rule) {
//Send a clone of rules to here to avoid need of locking mutex
// Send a clone of rules to here to avoid need of locking mutex
SimultaneousConnections.mu.RLock()
if r.Simultaneous != 0 && SimultaneousConnections.SimultaneousConnections[index] >= (r.Simultaneous*2) { //If we have reached quota just terminate the connection; 0 means no limits
logVerbose(2, "Blocking new connection for port", r.Listen, "because the connection limit is reached. The current active connections count is", SimultaneousConnections.SimultaneousConnections[index]/2)
Expand All @@ -207,67 +241,77 @@ func handleRequest(conn net.Conn, index int, r Rule) {
}
SimultaneousConnections.mu.RUnlock()

proxy, err := net.Dial("tcp", r.Forward) //Open a connection to remote host
// Open a connection to remote host
proxy, err := net.Dial("tcp", r.Forward)
if err != nil {
logVerbose(1, "Error on dialing remote host:", err.Error())
_ = conn.Close()
return
}

// Increase the connection count
SimultaneousConnections.mu.Lock()
SimultaneousConnections.SimultaneousConnections[index] += 2 //Two is added; One for client to server and another for server to client
SimultaneousConnections.SimultaneousConnections[index] += 2 // Two is added; One for client to server and another for server to client
logVerbose(4, "Accepting a connection from", conn.RemoteAddr(), "; Now", SimultaneousConnections.SimultaneousConnections[index], "SimultaneousConnections")
SimultaneousConnections.mu.Unlock()

go copyIO(conn, proxy, index)
go copyIO(proxy, conn, index)
go copyIO(conn, proxy, index) // client -> server
go copyIO(proxy, conn, index) // server -> client
}

// Copies the src to dest
// Index is the rule index
func copyIO(src, dest net.Conn, index int) {
defer src.Close()
defer dest.Close()

var r int64 //r is the amount of bytes transferred
// r is the amount of bytes transferred
var r int64
var err error

if EnableTimeOut {
r, err = copyBuffer(src, dest)
r, err = copyBuffer(dest, src)
} else {
r, err = io.Copy(src, dest)
r, err = io.Copy(dest, src) // if timeout is not enabled just use the original io.copy
}

if err != nil {
if strings.Contains(err.Error(), "i/o timeout") {
logVerbose(3, "A connection timed out from", src.RemoteAddr(), "to", dest.RemoteAddr())
} else if strings.HasPrefix(err.Error(), "cannot set timeout for") {
if strings.HasSuffix(err.Error(), "use of closed network connection") {
logVerbose(4, err.Error())
} else {
logVerbose(1, err.Error())
}
} else {
logVerbose(4, "Error on copyBuffer(Usually happens):", err.Error())
logVerbose(4, "Error on copyBuffer:", err.Error())
}

}

Rules.mu.Lock() //Lock to change the amount of data transferred
Rules.mu.Lock() // lock to change the amount of data transferred
Rules.Rules[index].Quota -= r
Rules.mu.Unlock()

SimultaneousConnections.mu.Lock()
SimultaneousConnections.SimultaneousConnections[index]-- //This will actually run twice
SimultaneousConnections.SimultaneousConnections[index]-- // this will run twice
logVerbose(4, "Closing a connection from", src.RemoteAddr(), "; Connections Now:", SimultaneousConnections.SimultaneousConnections[index])
SimultaneousConnections.mu.Unlock()
}

func copyBuffer(dst, src net.Conn) (written int64, err error) {
buf := make([]byte, 32768)
buf := make([]byte, 32768) // 32kb buffer
for {
err = src.SetDeadline(time.Now().Add(TimeoutDuration))
if err != nil {
logVerbose(1, "cannot set timeout for src")
err = errors.New("cannot set timeout for src: " + err.Error())
break
}
nr, er := src.Read(buf)
if nr > 0 {
err = dst.SetDeadline(time.Now().Add(TimeoutDuration))
if err != nil {
logVerbose(1, "cannot set timeout for dest")
err = errors.New("cannot set timeout for dest: " + err.Error())
break
}
nw, ew := dst.Write(buf[0:nr])
Expand Down

0 comments on commit 5163cac

Please sign in to comment.