Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating the timeouts to make pass 1000 votes #16

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/execution/native/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (ns *Service) Execute(snap store.Snapshot, step execution.Step) (execution.
err := contract.Execute(snap, step)
if err != nil {
res.Accepted = false
// LG: DEBUG - I'd like to keep this commented line, as it helps debugging.
// res.Message = fmt.Sprintf("%+v", err)
res.Message = err.Error()
}

Expand Down
2 changes: 1 addition & 1 deletion core/ordering/cosipbft/cosipbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import (
const (
// DefaultRoundTimeout is the maximum round time the service waits
// for an event to happen.
DefaultRoundTimeout = 10 * time.Second
DefaultRoundTimeout = 200 * time.Second

// DefaultFailedRoundTimeout is the maximum round time the service waits
// for an event to happen, after a round has failed, thus letting time
Expand Down
2 changes: 1 addition & 1 deletion core/ordering/cosipbft/fastsync/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (s fastSync) requestSync(

catchup, ok := msg.(types.CatchupMessage)
if ok {
s.logger.Trace().Msgf("Got %d blocks from %v",
s.logger.Info().Msgf("Got %d fastsync blocks from %v",
len(catchup.GetBlockLinks()), from)

replies[from.String()] = struct{}{}
Expand Down
1 change: 1 addition & 0 deletions mino/minogrpc/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func NewMinogrpc(listen net.Addr, public *url.URL, router router.Router, opts ..
otgrpc.SpanDecorator(decorateServerTrace))),
grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer,
otgrpc.SpanDecorator(decorateServerTrace))),
grpc.MaxRecvMsgSize(session.MaxMessageSize),
}

if !tmpl.serveTLS {
Expand Down
21 changes: 15 additions & 6 deletions mino/minogrpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package minogrpc

import (
context "context"
"sync"

"github.com/rs/xid"
"go.dedis.ch/dela"
"go.dedis.ch/dela/internal/tracing"
Expand All @@ -15,10 +17,10 @@ import (
"go.dedis.ch/dela/mino/minogrpc/session"
"go.dedis.ch/dela/serde"
"golang.org/x/xerrors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"sync"
)

// RPC represents an RPC that has been registered by a client, which allows
Expand All @@ -32,8 +34,10 @@ type RPC struct {
}

// Call implements mino.RPC. It calls the RPC on each provided address.
func (rpc *RPC) Call(ctx context.Context,
req serde.Message, players mino.Players) (<-chan mino.Response, error) {
func (rpc *RPC) Call(
ctx context.Context,
req serde.Message, players mino.Players,
) (<-chan mino.Response, error) {

data, err := req.Serialize(rpc.overlay.context)
if err != nil {
Expand Down Expand Up @@ -75,7 +79,8 @@ func (rpc *RPC) Call(ctx context.Context,
header := metadata.New(map[string]string{headerURIKey: rpc.uri})
newCtx := metadata.NewOutgoingContext(ctx, header)

callResp, err := cl.Call(newCtx, sendMsg)
callResp, err := cl.Call(newCtx, sendMsg,
grpc.MaxCallRecvMsgSize(session.MaxMessageSize))
if err != nil {
resp := mino.NewResponseWithError(
addr,
Expand Down Expand Up @@ -134,7 +139,11 @@ func (rpc *RPC) Call(ctx context.Context,
// If C has to send a message to B, it will send it through node A. Similarly,
// if D has to send a message to G, it will move up the tree through B, A and
// finally C.
func (rpc RPC) Stream(ctx context.Context, players mino.Players) (mino.Sender, mino.Receiver, error) {
func (rpc RPC) Stream(ctx context.Context, players mino.Players) (
mino.Sender,
mino.Receiver,
error,
) {
if players == nil || players.Len() == 0 {
return nil, nil, xerrors.New("empty list of addresses")
}
Expand Down Expand Up @@ -180,7 +189,7 @@ func (rpc RPC) Stream(ctx context.Context, players mino.Players) (mino.Sender, m

ctx = metadata.NewOutgoingContext(ctx, md)

stream, err := client.Stream(ctx)
stream, err := client.Stream(ctx, grpc.MaxCallRecvMsgSize(session.MaxMessageSize))
if err != nil {
rpc.overlay.connMgr.Release(gw)

Expand Down
40 changes: 28 additions & 12 deletions mino/minogrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ type overlayServer struct {
// Join implements ptypes.OverlayServer. It processes the request by checking
// the validity of the token and if it is accepted, by sending the certificate
// to the known peers. It finally returns the certificates to the caller.
func (o overlayServer) Join(ctx context.Context, req *ptypes.JoinRequest) (*ptypes.JoinResponse, error) {
func (o overlayServer) Join(ctx context.Context, req *ptypes.JoinRequest) (
*ptypes.JoinResponse,
error,
) {
// 1. Check validity of the token.
if !o.tokens.Verify(req.Token) {
return nil, xerrors.Errorf("token '%s' is invalid", req.Token)
Expand Down Expand Up @@ -109,7 +112,8 @@ func (o overlayServer) Join(ctx context.Context, req *ptypes.JoinRequest) (*ptyp

client := ptypes.NewOverlayClient(conn)

_, err = client.Share(ctx, req.GetChain())
_, err = client.Share(ctx, req.GetChain(),
grpc.MaxCallRecvMsgSize(session.MaxMessageSize))
if err != nil {
res <- xerrors.Errorf("couldn't call share: %v", err)
return
Expand All @@ -135,7 +139,10 @@ func (o overlayServer) Join(ctx context.Context, req *ptypes.JoinRequest) (*ptyp

// Share implements ptypes.OverlayServer. It accepts a certificate from a
// participant only if it is valid from the address it claims to be.
func (o overlayServer) Share(ctx context.Context, msg *ptypes.CertificateChain) (*ptypes.CertificateAck, error) {
func (o overlayServer) Share(
ctx context.Context,
msg *ptypes.CertificateChain,
) (*ptypes.CertificateAck, error) {
from := o.addrFactory.FromText(msg.GetAddress()).(session.Address)

hostname, err := from.GetHostname()
Expand Down Expand Up @@ -579,7 +586,7 @@ func (o *overlay) Join(addr *url.URL, token string, certHash []byte) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

resp, err := client.Join(ctx, req)
resp, err := client.Join(ctx, req, grpc.MaxCallRecvMsgSize(session.MaxMessageSize))
if err != nil {
return xerrors.Errorf("couldn't call join: %v", err)
}
Expand Down Expand Up @@ -660,11 +667,13 @@ func (mgr *connManager) Acquire(to mino.Address) (grpc.ClientConnInterface, erro
Backoff: backoff.DefaultConfig,
MinConnectTimeout: defaultMinConnectTimeout,
}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(session.MaxMessageSize)),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.SpanDecorator(decorateClientTrace)),
),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer, otgrpc.SpanDecorator(decorateClientTrace)),
otgrpc.OpenTracingStreamClientInterceptor(tracer,
otgrpc.SpanDecorator(decorateClientTrace)),
),
}

Expand Down Expand Up @@ -697,7 +706,10 @@ func (mgr *connManager) Acquire(to mino.Address) (grpc.ClientConnInterface, erro
return conn, nil
}

func (mgr *connManager) getTransportCredential(addr mino.Address) (credentials.TransportCredentials, error) {
func (mgr *connManager) getTransportCredential(addr mino.Address) (
credentials.TransportCredentials,
error,
) {
clientChain, err := mgr.certs.Load(addr)
if err != nil {
return nil, xerrors.Errorf("while loading distant cert: %v", err)
Expand Down Expand Up @@ -734,10 +746,12 @@ func (mgr *connManager) getTransportCredential(addr mino.Address) (credentials.T
}

ta := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{{
Certificate: [][]byte{meCerts[0].Raw},
Leaf: meCerts[0],
}},
Certificates: []tls.Certificate{
{
Certificate: [][]byte{meCerts[0].Raw},
Leaf: meCerts[0],
},
},
RootCAs: pool,
MinVersion: tls.VersionTLS12,
})
Expand Down Expand Up @@ -808,8 +822,10 @@ func uriFromContext(ctx context.Context) string {

// decorateClientTrace adds the protocol tag and the streamID tag to a client
// side trace.
func decorateClientTrace(ctx context.Context, span opentracing.Span, method string,
req, resp interface{}, grpcError error) {
func decorateClientTrace(
ctx context.Context, span opentracing.Span, method string,
req, resp interface{}, grpcError error,
) {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
return
Expand Down
25 changes: 20 additions & 5 deletions mino/minogrpc/session/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ import (
// HandshakeKey is the key to the handshake store in the headers.
const HandshakeKey = "handshake"

// MaxMessageSize that will be sent using grpc
var MaxMessageSize = int(1e9)

// ConnectionManager is an interface required by the session to open and release
// connections to the relays.
type ConnectionManager interface {
Expand Down Expand Up @@ -360,7 +363,13 @@ func (s *session) sendPacket(p parent, pkt router.Packet, errs chan error) bool
return true
}

func (s *session) sendTo(p parent, to mino.Address, pkt router.Packet, errs chan error, wg *sync.WaitGroup) {
func (s *session) sendTo(
p parent,
to mino.Address,
pkt router.Packet,
errs chan error,
wg *sync.WaitGroup,
) {
defer wg.Done()

var relay Relay
Expand Down Expand Up @@ -438,7 +447,10 @@ func (s *session) setupRelay(p parent, addr mino.Address) (Relay, error) {

cl := ptypes.NewOverlayClient(conn)

stream, err := cl.Stream(ctx, grpc.WaitForReady(false))
stream, err := cl.Stream(ctx,
grpc.WaitForReady(false),
grpc.MaxCallRecvMsgSize(MaxMessageSize),
)
if err != nil {
s.connMgr.Release(addr)
return nil, xerrors.Errorf("client: %v", err)
Expand Down Expand Up @@ -546,8 +558,10 @@ type unicastRelay struct {

// NewRelay returns a new relay that will send messages to the gateway through
// unicast requests.
func NewRelay(stream PacketStream, gw mino.Address,
ctx serde.Context, conn grpc.ClientConnInterface, md metadata.MD) Relay {
func NewRelay(
stream PacketStream, gw mino.Address,
ctx serde.Context, conn grpc.ClientConnInterface, md metadata.MD,
) Relay {

r := &unicastRelay{
md: md,
Expand Down Expand Up @@ -583,7 +597,8 @@ func (r *unicastRelay) Send(ctx context.Context, p router.Packet) (*ptypes.Ack,

ctx = metadata.NewOutgoingContext(ctx, r.md)

ack, err := client.Forward(ctx, &ptypes.Packet{Serialized: data})
ack, err := client.Forward(ctx, &ptypes.Packet{Serialized: data},
grpc.MaxCallRecvMsgSize(MaxMessageSize))
if err != nil {
return nil, xerrors.Errorf("client: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package dela

import (
"fmt"
"os"
"time"

Expand Down Expand Up @@ -73,6 +74,7 @@ func init() {
default:
level = zerolog.TraceLevel
}
fmt.Println("LogLevel is:", logLevel, level)

Logger = Logger.Level(level)
PromCollectors = append(PromCollectors, promWarns, promErrs)
Expand Down