Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: streaming CSV encodes #1800

Merged
merged 1 commit into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/apirouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,10 @@ func NewFileRouter(app *appContext, useRealIP bool) fileMux {
mux.Route("/address", func(rd chi.Router) {
// Allow browser cache for 3 minutes.
rd.Use(m.CacheControl(180))
rd.With(m.AddressPathCtxN(1)).Get("/io/{address}", app.addressIoCsv)
// The carriage return option is handled on the path to facilitate more
// effective caching in downstream delivery.
rd.With(m.AddressPathCtxN(1)).Get("/io/{address}", app.addressIoCsvNoCR)
rd.With(m.AddressPathCtxN(1)).Get("/io/{address}/win", app.addressIoCsvCR)
})

return fileMux{mux}
Expand Down
116 changes: 61 additions & 55 deletions api/apiroutes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package api

import (
"bytes"
"context"
"database/sql"
"encoding/binary"
Expand Down Expand Up @@ -63,7 +62,7 @@ type DataSource interface {
TicketPoolVisualization(interval dbtypes.TimeBasedGrouping) (
*dbtypes.PoolTicketsData, *dbtypes.PoolTicketsData, *dbtypes.PoolTicketsData, int64, error)
AgendaVotes(agendaID string, chartType int) (*dbtypes.AgendaVoteChoices, error)
AddressTxIoCsv(address string) ([][]string, error)
AddressRowsCompact(address string) ([]*dbtypes.AddressRowCompact, error)
Height() int64
AllAgendas() (map[string]dbtypes.MileStone, error)
GetTicketInfo(txid string) (*apitypes.TicketInfo, error)
Expand Down Expand Up @@ -290,45 +289,6 @@ func writeJSONBytes(w http.ResponseWriter, data []byte) {
}
}

// Measures length, sets common headers, formats, and sends CSV data.
func writeCSV(w http.ResponseWriter, rows [][]string, filename string, useCRLF bool) {
w.Header().Set("Content-Disposition",
fmt.Sprintf("attachment;filename=%s", filename))
w.Header().Set("Content-Type", "text/csv")

// To set the Content-Length response header, it is necessary to write the
// CSV data into a buffer rather than streaming the response directly to the
// http.ResponseWriter.
buffer := new(bytes.Buffer)
writer := csv.NewWriter(buffer)
writer.UseCRLF = useCRLF
err := writer.WriteAll(rows)
if err != nil {
log.Errorf("Failed to write address rows to buffer: %v", err)
http.Error(w, http.StatusText(http.StatusInternalServerError),
http.StatusInternalServerError)
return
}

bytesToSend := int64(buffer.Len())
w.Header().Set("Content-Length", strconv.FormatInt(bytesToSend, 10))

bytesWritten, err := buffer.WriteTo(w)
if err != nil {
log.Errorf("Failed to transfer address rows from buffer. "+
"%d bytes written. %v", bytesWritten, err)
http.Error(w, http.StatusText(http.StatusInternalServerError),
http.StatusInternalServerError)
return
}

// Warn if the number of bytes sent differs from buffer length.
if bytesWritten != bytesToSend {
log.Warnf("Failed to send the entire file. Sent %d of %d bytes.",
bytesWritten, bytesToSend)
}
}

func getVoteVersionQuery(r *http.Request) (int32, string, error) {
verLatest := int64(m.GetLatestVoteVersionCtx(r))
voteVersion := r.URL.Query().Get("version")
Expand Down Expand Up @@ -1515,19 +1475,20 @@ func (c *appContext) addressExists(w http.ResponseWriter, r *http.Request) {
writeJSON(w, exists, m.GetIndentCtx(r))
}

// Handler for address activity CSV file download.
// /download/address/io/{address}?cr=[true|false]
func (c *appContext) addressIoCsv(w http.ResponseWriter, r *http.Request) {
// Check if ?cr=true was specified.
var useCRLF bool
if crlfParam := r.URL.Query().Get("cr"); crlfParam != "" {
b, err := strconv.ParseBool(crlfParam)
if err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
func (c *appContext) addressIoCsvNoCR(w http.ResponseWriter, r *http.Request) {
c.addressIoCsv(false, w, r)
}
func (c *appContext) addressIoCsvCR(w http.ResponseWriter, r *http.Request) {
c.addressIoCsv(true, w, r)
}

useCRLF = b
// Handler for address activity CSV file download.
// /download/address/io/{address}[/win]
func (c *appContext) addressIoCsv(crlf bool, w http.ResponseWriter, r *http.Request) {
wf, ok := w.(http.Flusher)
if !ok {
http.Error(w, "unable to flush streamed data", http.StatusBadRequest)
return
}

addresses, err := m.GetAddressCtx(r, c.Params)
Expand All @@ -1544,7 +1505,11 @@ func (c *appContext) addressIoCsv(w http.ResponseWriter, r *http.Request) {
return
}

rows, err := c.DataSource.AddressTxIoCsv(address)
// TODO: Improve the DB component also to avoid retrieving all row data
// and/or put a hard limit on the number of rows that can be retrieved.
// However it is a slice of pointers, and they are are also in the address
// cache and thus shared across calls to the same address.
rows, err := c.DataSource.AddressRowsCompact(address)
if err != nil {
log.Errorf("Failed to fetch AddressTxIoCsv: %v", err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
Expand All @@ -1553,8 +1518,49 @@ func (c *appContext) addressIoCsv(w http.ResponseWriter, r *http.Request) {

filename := fmt.Sprintf("address-io-%s-%d-%s.csv", address,
c.Status.Height(), strconv.FormatInt(time.Now().Unix(), 10))
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment;filename=%s", filename))
w.Header().Set("Content-Type", "text/csv; charset=utf-8")
w.WriteHeader(http.StatusOK)
writer := csv.NewWriter(w)
writer.UseCRLF = crlf

err = writer.Write([]string{"tx_hash", "direction", "io_index",
"valid_mainchain", "value", "time_stamp", "tx_type", "matching_tx_hash"})
if err != nil {
return // too late to write an error code
}
writer.Flush()
wf.Flush()

writeCSV(w, rows, filename, useCRLF)
var strValidMainchain, strDirection string
for _, r := range rows {
if r.ValidMainChain {
strValidMainchain = "1"
} else {
strValidMainchain = "0"
}
if r.IsFunding {
strDirection = "1"
} else {
strDirection = "-1"
}

err = writer.Write([]string{
r.TxHash.String(),
strDirection,
strconv.FormatUint(uint64(r.TxVinVoutIndex), 10),
strValidMainchain,
strconv.FormatFloat(dcrutil.Amount(r.Value).ToCoin(), 'f', -1, 64),
strconv.FormatInt(r.TxBlockTime, 10),
txhelpers.TxTypeToString(int(r.TxType)),
r.MatchingTxHash.String(),
})
if err != nil {
return // too late to write an error code
}
writer.Flush()
wf.Flush()
}
}

func (c *appContext) getAddressTxTypesData(w http.ResponseWriter, r *http.Request) {
Expand Down
156 changes: 14 additions & 142 deletions db/dcrpg/pgblockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"math"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1639,25 +1638,33 @@ func (pgb *ChainDB) AddressTransactions(address string, N, offset int64,
}

// AddressTransactionsAll retrieves all non-merged main chain addresses table
// rows for the given address.
// rows for the given address. There is presently a hard limit of 3 million rows
// that may be returned, which is more than 4x the count for the treasury
// adddress as of mainnet block 521900.
func (pgb *ChainDB) AddressTransactionsAll(address string) (addressRows []*dbtypes.AddressRow, err error) {
ctx, cancel := context.WithTimeout(pgb.ctx, pgb.queryTimeout)
defer cancel()

addressRows, err = RetrieveAllMainchainAddressTxns(ctx, pgb.db, address)
const limit = 3000000
addressRows, err = RetrieveAddressTxns(ctx, pgb.db, address, limit, 0)
// addressRows, err = RetrieveAllMainchainAddressTxns(ctx, pgb.db, address)
err = pgb.replaceCancelError(err)
return
}

// AddressTransactionsAllMerged retrieves all merged (stakeholder-approved and
// mainchain only) addresses table rows for the given address.
// mainchain only) addresses table rows for the given address. There is
// presently a hard limit of 3 million rows that may be returned, which is more
// than 4x the count for the treasury adddress as of mainnet block 521900.
func (pgb *ChainDB) AddressTransactionsAllMerged(address string) (addressRows []*dbtypes.AddressRow, err error) {
ctx, cancel := context.WithTimeout(pgb.ctx, pgb.queryTimeout)
defer cancel()

onlyValidMainchain := true
_, addressRows, err = RetrieveAllAddressMergedTxns(ctx, pgb.db, address,
onlyValidMainchain)
const limit = 3000000
addressRows, err = RetrieveAddressMergedTxns(ctx, pgb.db, address, limit, 0)
// const onlyValidMainchain = true
// _, addressRows, err = RetrieveAllAddressMergedTxns(ctx, pgb.db, address,
// onlyValidMainchain)
err = pgb.replaceCancelError(err)
return
}
Expand Down Expand Up @@ -2611,141 +2618,6 @@ func (pgb *ChainDB) AddressTotals(address string) (*apitypes.AddressTotals, erro
}, nil
}

// MakeCsvAddressRows converts an AddressRow slice into a [][]string, including
// column headers, suitable for saving to CSV.
func MakeCsvAddressRows(rows []*dbtypes.AddressRow) [][]string {
csvRows := make([][]string, 0, len(rows)+1)
csvRows = append(csvRows, []string{"tx_hash", "direction", "io_index",
"valid_mainchain", "value", "time_stamp", "tx_type", "matching_tx_hash"})

for _, r := range rows {
var strValidMainchain string
if r.ValidMainChain {
strValidMainchain = "1"
} else {
strValidMainchain = "0"
}

var strDirection string
if r.IsFunding {
strDirection = "1"
} else {
strDirection = "-1"
}

csvRows = append(csvRows, []string{
r.TxHash,
strDirection,
strconv.Itoa(int(r.TxVinVoutIndex)),
strValidMainchain,
strconv.FormatFloat(dcrutil.Amount(r.Value).ToCoin(), 'f', -1, 64),
strconv.FormatInt(r.TxBlockTime.UNIX(), 10),
txhelpers.TxTypeToString(int(r.TxType)),
r.MatchingTxHash,
})
}
return csvRows
}

func MakeCsvAddressRowsCompact(rows []*dbtypes.AddressRowCompact) [][]string {
csvRows := make([][]string, 0, len(rows)+1)
csvRows = append(csvRows, []string{"tx_hash", "direction", "io_index",
"valid_mainchain", "value", "time_stamp", "tx_type", "matching_tx_hash"})

for _, r := range rows {
var strValidMainchain string
if r.ValidMainChain {
strValidMainchain = "1"
} else {
strValidMainchain = "0"
}

var strDirection string
if r.IsFunding {
strDirection = "1"
} else {
strDirection = "-1"
}

csvRows = append(csvRows, []string{
r.TxHash.String(),
strDirection,
strconv.Itoa(int(r.TxVinVoutIndex)),
strValidMainchain,
strconv.FormatFloat(dcrutil.Amount(r.Value).ToCoin(), 'f', -1, 64),
strconv.FormatInt(r.TxBlockTime, 10),
txhelpers.TxTypeToString(int(r.TxType)),
r.MatchingTxHash.String(),
})
}
return csvRows
}

// AddressTxIoCsv grabs rows of an address' transaction input/output data as a
// 2-D array of strings to be CSV-formatted.
func (pgb *ChainDB) AddressTxIoCsv(address string) ([][]string, error) {
rows, err := pgb.AddressRowsCompact(address)
if err != nil {
return nil, err
}

return MakeCsvAddressRowsCompact(rows), nil

// ALT implementation, without cache update:

// Try the address rows cache.
// hash := pgb.BestBlockHash()
// rows, validBlock := pgb.AddressCache.Rows(address)
// cacheCurrent := validBlock != nil && validBlock.Hash == *hash
// if cacheCurrent && rows != nil {
// log.Debugf("AddressTxIoCsv: Merged address rows cache HIT for %s.", address)

// csvRows := make([][]string, 0, len(rows)+1)
// csvRows = append(csvRows, []string{"tx_hash", "direction", "io_index",
// "valid_mainchain", "value", "time_stamp", "tx_type", "matching_tx_hash"})

// for _, r := range rows {
// var strValidMainchain string
// if r.ValidMainChain {
// strValidMainchain = "1"
// } else {
// strValidMainchain = "0"
// }

// var strDirection string
// if r.IsFunding {
// strDirection = "1"
// } else {
// strDirection = "-1"
// }

// csvRows = append(csvRows, []string{
// r.TxHash,
// strDirection,
// strconv.Itoa(int(r.TxVinVoutIndex)),
// strValidMainchain,
// strconv.FormatFloat(dcrutil.Amount(r.Value).ToCoin(), 'f', -1, 64),
// strconv.FormatInt(r.TxBlockTime.UNIX(), 10),
// txhelpers.TxTypeToString(int(r.TxType)),
// r.MatchingTxHash,
// })
// }

// return csvRows, nil
// }

// log.Debugf("AddressTxIoCsv: Merged address rows cache MISS for %s.", address)

// ctx, cancel := context.WithTimeout(pgb.ctx, pgb.queryTimeout)
// defer cancel()

// csvRows, err := retrieveAddressIoCsv(ctx, pgb.db, address)
// if err != nil {
// return nil, fmt.Errorf("retrieveAddressIoCsv: %v", err)
// }
// return csvRows, nil
}

func (pgb *ChainDB) addressInfo(addr string, count, skip int64, txnType dbtypes.AddrTxnViewType) (*dbtypes.AddressInfo, *dbtypes.AddressBalance, error) {
address, err := dcrutil.DecodeAddress(addr, pgb.chainParams)
if err != nil {
Expand Down
Loading