Skip to content
This repository has been archived by the owner on Dec 12, 2024. It is now read-only.

Commit

Permalink
Batch Writes (#252)
Browse files Browse the repository at this point in the history
* ctx for db write

* builds

* fixed build and tests

* adding batch writes to create credential

* fix linter

Co-authored-by: Gabe <[email protected]>
  • Loading branch information
nitro-neal and decentralgabe authored Jan 17, 2023
1 parent 3ece608 commit bf882d1
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 15 deletions.
45 changes: 39 additions & 6 deletions pkg/service/credential/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (s Service) CreateCredential(ctx context.Context, request CreateCredentialR

logrus.Debugf("creating credential: %+v", request)

writeContexts := make([]WriteContext, 0)
builder := credential.NewVerifiableCredentialBuilder()

if err := builder.SetIssuer(request.Issuer); err != nil {
Expand Down Expand Up @@ -176,6 +177,34 @@ func (s Service) CreateCredential(ctx context.Context, request CreateCredentialR
if err := builder.SetCredentialStatus(status); err != nil {
return nil, util.LoggingErrorMsg(err, "could not set credential status")
}

statusListCredJWT, err := s.signCredentialJWT(ctx, request.Issuer, *statusListCredential)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not sign status list credential")
}

statusListContainer := credint.Container{
ID: statusListCredential.ID,
Credential: statusListCredential,
CredentialJWT: statusListCredJWT,
}

statusListStorageRequest := StoreCredentialRequest{
Container: statusListContainer,
}

statusListIndexWriteContext, err := s.storage.GetIncrementStatusListIndexWriteContext(ctx)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not get status list index write context")
}

statusListCredWriteContext, err := s.storage.GetStoreStatusListCredentialWriteContext(statusListStorageRequest)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not get status list credential write context")
}

writeContexts = append(writeContexts, *statusListIndexWriteContext)
writeContexts = append(writeContexts, *statusListCredWriteContext)
}

cred, err := builder.Build()
Expand All @@ -191,29 +220,33 @@ func (s Service) CreateCredential(ctx context.Context, request CreateCredentialR
}

// TODO(gabe) support Data Integrity creds too https://github.com/TBD54566975/ssi-service/issues/105
// sign the credential
credJWT, err := s.signCredentialJWT(ctx, request.Issuer, *cred)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not sign credential")
}

// store the credential
container := credint.Container{
ID: cred.ID,
Credential: cred,
CredentialJWT: credJWT,
Revoked: false,
}

storageRequest := StoreCredentialRequest{
credentialStorageRequest := StoreCredentialRequest{
Container: container,
}

if err = s.storage.StoreCredential(ctx, storageRequest); err != nil {
return nil, util.LoggingErrorMsg(err, "could not store credential")
credWriteContext, err := s.storage.GetStoreCredentialWriteContext(credentialStorageRequest)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not get credential write context")

}
writeContexts = append(writeContexts, *credWriteContext)

if err = s.storage.WriteMany(ctx, writeContexts); err != nil {
return nil, util.LoggingErrorMsg(err, "failed to save vc")
}

// return the result
response := CreateCredentialResponse{Container: container}
return &response, nil
}
Expand Down
91 changes: 82 additions & 9 deletions pkg/service/credential/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type StoredCredential struct {
Revoked bool `json:"revoked"`
}

type WriteContext struct {
namespace string
key string
value []byte
}

func (sc StoredCredential) IsValid() bool {
return sc.ID != "" && (sc.HasDataIntegrityCredential() || sc.HasJWTCredential())
}
Expand Down Expand Up @@ -127,16 +133,59 @@ func (cs *Storage) GetNextStatusListRandomIndex(ctx context.Context) (int, error
return -1, util.LoggingErrorMsgf(err, "could not unmarshal unique numbers")
}

return uniqueNums[statusListIndex.Index], nil
}

func (cs *Storage) WriteMany(ctx context.Context, writeContexts []WriteContext) error {
namespaces := make([]string, 0)
keys := make([]string, 0)
values := make([][]byte, 0)

for i := range writeContexts {
namespaces = append(namespaces, writeContexts[i].namespace)
keys = append(keys, writeContexts[i].key)
values = append(values, writeContexts[i].value)
}

return cs.db.WriteMany(ctx, namespaces, keys, values)
}

func (cs *Storage) IncrementStatusListIndex(ctx context.Context) error {
wc, err := cs.GetIncrementStatusListIndexWriteContext(ctx)
if err != nil {
return util.LoggingErrorMsg(err, "problem getting increment status listIndex writeContext")
}

if err := cs.db.Write(ctx, wc.namespace, wc.key, wc.value); err != nil {
return util.LoggingErrorMsg(err, "problem writing current list index to db")
}

return nil
}

func (cs *Storage) GetIncrementStatusListIndexWriteContext(ctx context.Context) (*WriteContext, error) {
gotCurrentListIndexBytes, err := cs.db.Read(ctx, statusListIndexNamespace, currentListIndexKey)
if err != nil {
return nil, util.LoggingErrorMsgf(err, "could not get list index")
}

var statusListIndex StatusListIndex
if err = json.Unmarshal(gotCurrentListIndexBytes, &statusListIndex); err != nil {
return nil, util.LoggingErrorMsgf(err, "could not unmarshal unique numbers")
}

statusListIndexBytes, err := json.Marshal(StatusListIndex{Index: statusListIndex.Index + 1})
if err != nil {
return -1, util.LoggingErrorMsg(err, "could not marshal status list index bytes")
return nil, util.LoggingErrorMsg(err, "could not marshal status list index bytes")
}

if err := cs.db.Write(ctx, statusListIndexNamespace, currentListIndexKey, statusListIndexBytes); err != nil {
return -1, util.LoggingErrorMsg(err, "problem writing current list index to db")
wc := WriteContext{
namespace: statusListIndexNamespace,
key: currentListIndexKey,
value: statusListIndexBytes,
}

return uniqueNums[statusListIndex.Index], nil
return &wc, nil
}

func (cs *Storage) StoreCredential(ctx context.Context, request StoreCredentialRequest) error {
Expand All @@ -148,22 +197,46 @@ func (cs *Storage) StoreStatusListCredential(ctx context.Context, request StoreC
}

func (cs *Storage) storeCredential(ctx context.Context, request StoreCredentialRequest, namespace string) error {

wc, err := cs.getStoreCredentialWriteContext(request, namespace)
if err != nil {
return errors.Wrap(err, "could not get stored credential write context")
}
// TODO(gabe) conflict checking?
return cs.db.Write(ctx, wc.namespace, wc.key, wc.value)
}

func (cs *Storage) GetStoreCredentialWriteContext(request StoreCredentialRequest) (*WriteContext, error) {
return cs.getStoreCredentialWriteContext(request, credentialNamespace)
}

func (cs *Storage) GetStoreStatusListCredentialWriteContext(request StoreCredentialRequest) (*WriteContext, error) {
return cs.getStoreCredentialWriteContext(request, statusListCredentialNamespace)
}

func (cs *Storage) getStoreCredentialWriteContext(request StoreCredentialRequest, namespace string) (*WriteContext, error) {
if !request.IsValid() {
return util.LoggingNewError("store request request is not valid")
return nil, util.LoggingNewError("store request request is not valid")
}

// transform the credential into its denormalized form for storage
storedCredential, err := buildStoredCredential(request)
if err != nil {
return errors.Wrap(err, "could not build stored credential")
return nil, errors.Wrap(err, "could not build stored credential")
}

storedCredBytes, err := json.Marshal(storedCredential)
if err != nil {
return util.LoggingErrorMsgf(err, "could not store request: %s", storedCredential.CredentialID)
return nil, util.LoggingErrorMsgf(err, "could not store request: %s", storedCredential.CredentialID)
}
// TODO(gabe) conflict checking?
return cs.db.Write(ctx, namespace, storedCredential.ID, storedCredBytes)

wc := WriteContext{
namespace: namespace,
key: storedCredential.ID,
value: storedCredBytes,
}

return &wc, nil
}

// buildStoredCredential generically parses a store credential request and returns the object to be stored
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,24 @@ func (b *BoltDB) Write(ctx context.Context, namespace string, key string, value
return nil
})
}
func (b *BoltDB) WriteMany(ctx context.Context, namespaces, keys []string, values [][]byte) error {
if len(namespaces) != len(keys) && len(namespaces) != len(values) {
return errors.New("namespaces, keys, and values, are not of equal length")
}

return b.db.Update(func(tx *bolt.Tx) error {
for i := range namespaces {
bucket, err := tx.CreateBucketIfNotExists([]byte(namespaces[i]))
if err != nil {
return err
}
if err = bucket.Put([]byte(keys[i]), values[i]); err != nil {
return err
}
}
return nil
})
}

func (b *BoltDB) Read(ctx context.Context, namespace, key string) ([]byte, error) {
var result []byte
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ServiceStorage interface {
IsOpen() bool
Close() error
Write(ctx context.Context, namespace, key string, value []byte) error
WriteMany(ctx context.Context, namespace, key []string, value [][]byte) error
Read(ctx context.Context, namespace, key string) ([]byte, error)
ReadAll(ctx context.Context, namespace string) (map[string][]byte, error)
ReadPrefix(ctx context.Context, namespace, prefix string) (map[string][]byte, error)
Expand Down

0 comments on commit bf882d1

Please sign in to comment.