Skip to content

Commit

Permalink
various TODOs
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin committed Jan 9, 2025
1 parent 8b2ab02 commit 03d1422
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 21 deletions.
35 changes: 18 additions & 17 deletions da/avail/avail.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S

// Set defaults
c.pubsubServer = pubsubServer

// TODO: Make configurable
c.txInclusionTimeout = defaultTxInculsionTimeout
c.batchRetryDelay = defaultBatchRetryDelay
c.batchRetryAttempts = defaultBatchRetryAttempts
Expand All @@ -125,6 +127,14 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S
apply(c)
}

types.RollappConsecutiveFailedDASubmission.Set(0)

return nil
}

// Start starts DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Start() error {
c.ctx, c.cancel = context.WithCancel(context.Background())
// If client wasn't set, create a new one
if c.client == nil {
substrateApiClient, err := gsrpc.NewSubstrateAPI(c.config.ApiURL)
Expand All @@ -138,14 +148,7 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S
}
}

types.RollappConsecutiveFailedDASubmission.Set(0)

c.ctx, c.cancel = context.WithCancel(context.Background())
return nil
}

// Start starts DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Start() error {
// TODO: should actually check for synced client
c.synced <- struct{}{}
return nil
}
Expand All @@ -169,6 +172,8 @@ func (c *DataAvailabilityLayerClient) GetClientType() da.Client {

// RetrieveBatches retrieves batch from DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMetaData) da.ResultRetrieveBatch {
// TODO: add retries (better to refactor and add retries by the caller)

//nolint:typecheck
blockHash, err := c.client.GetBlockHash(daMetaData.Height)
if err != nil {
Expand Down Expand Up @@ -199,6 +204,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
ext.Method.CallIndex.MethodIndex == DataCallMethodIndex {

data := ext.Method.Args
// FIXME: potential deadlock on parsing error
for 0 < len(data) {
var pbBatch pb.Batch
err := proto.Unmarshal(data, &pbBatch)
Expand All @@ -222,6 +228,8 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
}
}

// TODO: if no batches, return error

return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Expand Down Expand Up @@ -256,13 +264,7 @@ func (c *DataAvailabilityLayerClient) submitBatchLoop(dataBlob []byte) da.Result
for {
select {
case <-c.ctx.Done():
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: "context done",
Error: c.ctx.Err(),
},
}
return da.ResultSubmitBatch{}
default:
var daBlockHeight uint64
err := retry.Do(
Expand Down Expand Up @@ -378,11 +380,10 @@ func (c *DataAvailabilityLayerClient) broadcastTx(tx []byte) (uint64, error) {
if err != nil {
return 0, fmt.Errorf("%w: %s", da.ErrTxBroadcastNetworkError, err)
}
defer sub.Unsubscribe()

c.logger.Info("Submitted batch to avail. Waiting for inclusion event")

defer sub.Unsubscribe()

inclusionTimer := time.NewTimer(c.txInclusionTimeout)
defer inclusionTimer.Stop()

Expand Down
4 changes: 0 additions & 4 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ func (c *DataAvailabilityLayerClient) Start() (err error) {
// Stop stops DataAvailabilityLayerClient.
func (c *DataAvailabilityLayerClient) Stop() error {
c.logger.Info("Stopping Celestia Data Availability Layer Client.")
err := c.pubsubServer.Stop()
if err != nil {
return err
}
c.cancel()
close(c.synced)
return nil
Expand Down

0 comments on commit 03d1422

Please sign in to comment.