forked from raedahgroup/dcrtxmatcher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dcrtxmatcher.go
145 lines (116 loc) · 3.08 KB
/
dcrtxmatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"context"
"fmt"
"net"
"net/http"
"os"
"github.com/gorilla/websocket"
pb "github.com/raedahgroup/dcrtxmatcher/api/matcherrpc"
"github.com/raedahgroup/dcrtxmatcher/coinjoin"
"github.com/raedahgroup/dcrtxmatcher/matcher"
"google.golang.org/grpc"
)
func main() {
// Create a context that is cancelled when a shutdown request is received
// through an interrupt signal or an RPC request.
ctx := withShutdownCancel(context.Background())
go shutdownListener()
if err := run(ctx); err != nil && err != context.Canceled {
os.Exit(1)
}
}
func run(ctx context.Context) error {
config, _, err := loadConfig(ctx)
if err != nil {
log.Errorf("loadConfig error %v", err)
return err
}
if done(ctx) {
return ctx.Err()
}
if config.BlindServer {
dcmixlog.Infof("MinParticipants %d", config.MinParticipants)
dicemixCfg := &coinjoin.Config{
MinParticipants: config.MinParticipants,
RandomIndex: config.RandomIndex,
JoinTicker: config.JoinTicker,
WaitingTimer: config.WaitingTimer,
}
//websocket
joinQueue := coinjoin.NewJoinQueue()
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
ReadBufferSize: 1024 * 1024,
WriteBufferSize: 1024 * 1024,
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Infof("Can not upgrade from remote address %v", r.RemoteAddr)
return
}
//add ws connection to dicemix for management
peer := coinjoin.NewPeer(conn)
peer.IPAddr = r.RemoteAddr
joinQueue.NewPeerChan <- peer
})
diceMix := coinjoin.NewDiceMix(dicemixCfg)
go diceMix.Run(joinQueue)
intf := fmt.Sprintf(":%d", config.Port)
dcmixlog.Infof("Listening on %s", intf)
http.ListenAndServe(intf, nil)
} else {
mcfg := &matcher.Config{
MinParticipants: config.MinParticipants,
RandomIndex: config.RandomIndex,
JoinTicker: config.JoinTicker,
WaitingTimer: config.WaitingTimer,
}
//set matcher config
ticketJoiner := matcher.NewTicketJoiner(mcfg)
waitingQueue := matcher.NewWaitingQueue()
intf := fmt.Sprintf(":%d", config.Port)
lis, err := net.Listen("tcp", intf)
if err != nil {
log.Errorf("Error listening: %v", err)
return err
}
if done(ctx) {
return ctx.Err()
}
go ticketJoiner.Run(waitingQueue)
server := grpc.NewServer()
pb.RegisterSplitTxMatcherServiceServer(server, NewSplitTxMatcherService(ticketJoiner, waitingQueue))
if done(ctx) {
return ctx.Err()
}
log.Infof("Listening on %s", intf)
go server.Serve(lis)
if server != nil {
defer func() {
log.Info("Stop Grpc server...")
server.Stop()
log.Info("Grpc server stops")
}()
}
if ticketJoiner != nil {
defer func() {
log.Info("Stop Ticket joiner...")
ticketJoiner.Stop(config.CompleteJoin)
log.Info("Ticket joiner stops")
}()
}
}
<-ctx.Done()
return ctx.Err()
}
// done returns whether the context's Done channel was closed due to
// cancellation or exceeded deadline.
func done(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}