Skip to content
This repository has been archived by the owner on Dec 12, 2024. It is now read-only.

Batch Writes #252

Merged
merged 10 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions pkg/service/credential/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (s Service) CreateCredential(ctx context.Context, request CreateCredentialR

logrus.Debugf("creating credential: %+v", request)

writeContexts := make([]WriteContext, 0)
builder := credential.NewVerifiableCredentialBuilder()

if err := builder.SetIssuer(request.Issuer); err != nil {
Expand Down Expand Up @@ -176,6 +177,34 @@ func (s Service) CreateCredential(ctx context.Context, request CreateCredentialR
if err := builder.SetCredentialStatus(status); err != nil {
return nil, util.LoggingErrorMsg(err, "could not set credential status")
}

statusListCredJWT, err := s.signCredentialJWT(ctx, request.Issuer, *statusListCredential)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not sign status list credential")
}

statusListContainer := credint.Container{
ID: statusListCredential.ID,
Credential: statusListCredential,
CredentialJWT: statusListCredJWT,
}

statusListStorageRequest := StoreCredentialRequest{
Container: statusListContainer,
}

statusListIndexWriteContext, err := s.storage.GetIncrementStatusListIndexWriteContext(ctx)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not get status list index write context")
}

statusListCredWriteContext, err := s.storage.GetStoreStatusListCredentialWriteContext(statusListStorageRequest)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not get status list credential write context")
}

writeContexts = append(writeContexts, *statusListIndexWriteContext)
writeContexts = append(writeContexts, *statusListCredWriteContext)
}

cred, err := builder.Build()
Expand All @@ -191,29 +220,33 @@ func (s Service) CreateCredential(ctx context.Context, request CreateCredentialR
}

// TODO(gabe) support Data Integrity creds too https://github.com/TBD54566975/ssi-service/issues/105
// sign the credential
credJWT, err := s.signCredentialJWT(ctx, request.Issuer, *cred)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not sign credential")
}

// store the credential
container := credint.Container{
ID: cred.ID,
Credential: cred,
CredentialJWT: credJWT,
Revoked: false,
}

storageRequest := StoreCredentialRequest{
credentialStorageRequest := StoreCredentialRequest{
Container: container,
}

if err = s.storage.StoreCredential(ctx, storageRequest); err != nil {
return nil, util.LoggingErrorMsg(err, "could not store credential")
credWriteContext, err := s.storage.GetStoreCredentialWriteContext(credentialStorageRequest)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not get credential write context")

}
writeContexts = append(writeContexts, *credWriteContext)

if err = s.storage.WriteMany(ctx, writeContexts); err != nil {
return nil, util.LoggingErrorMsg(err, "failed to save vc")
}

// return the result
response := CreateCredentialResponse{Container: container}
return &response, nil
}
Expand Down
91 changes: 82 additions & 9 deletions pkg/service/credential/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type StoredCredential struct {
Revoked bool `json:"revoked"`
}

type WriteContext struct {
namespace string
key string
value []byte
}

func (sc StoredCredential) IsValid() bool {
return sc.ID != "" && (sc.HasDataIntegrityCredential() || sc.HasJWTCredential())
}
Expand Down Expand Up @@ -127,16 +133,59 @@ func (cs *Storage) GetNextStatusListRandomIndex(ctx context.Context) (int, error
return -1, util.LoggingErrorMsgf(err, "could not unmarshal unique numbers")
}

return uniqueNums[statusListIndex.Index], nil
}

func (cs *Storage) WriteMany(ctx context.Context, writeContexts []WriteContext) error {
namespaces := make([]string, 0)
keys := make([]string, 0)
values := make([][]byte, 0)

for i := range writeContexts {
namespaces = append(namespaces, writeContexts[i].namespace)
keys = append(keys, writeContexts[i].key)
values = append(values, writeContexts[i].value)
}

return cs.db.WriteMany(ctx, namespaces, keys, values)
}

func (cs *Storage) IncrementStatusListIndex(ctx context.Context) error {
wc, err := cs.GetIncrementStatusListIndexWriteContext(ctx)
if err != nil {
return util.LoggingErrorMsg(err, "problem getting increment status listIndex writeContext")
}

if err := cs.db.Write(ctx, wc.namespace, wc.key, wc.value); err != nil {
return util.LoggingErrorMsg(err, "problem writing current list index to db")
}

return nil
}

func (cs *Storage) GetIncrementStatusListIndexWriteContext(ctx context.Context) (*WriteContext, error) {
gotCurrentListIndexBytes, err := cs.db.Read(ctx, statusListIndexNamespace, currentListIndexKey)
if err != nil {
return nil, util.LoggingErrorMsgf(err, "could not get list index")
}

var statusListIndex StatusListIndex
if err = json.Unmarshal(gotCurrentListIndexBytes, &statusListIndex); err != nil {
return nil, util.LoggingErrorMsgf(err, "could not unmarshal unique numbers")
}

statusListIndexBytes, err := json.Marshal(StatusListIndex{Index: statusListIndex.Index + 1})
if err != nil {
return -1, util.LoggingErrorMsg(err, "could not marshal status list index bytes")
return nil, util.LoggingErrorMsg(err, "could not marshal status list index bytes")
}

if err := cs.db.Write(ctx, statusListIndexNamespace, currentListIndexKey, statusListIndexBytes); err != nil {
return -1, util.LoggingErrorMsg(err, "problem writing current list index to db")
wc := WriteContext{
namespace: statusListIndexNamespace,
key: currentListIndexKey,
value: statusListIndexBytes,
}

return uniqueNums[statusListIndex.Index], nil
return &wc, nil
}

func (cs *Storage) StoreCredential(ctx context.Context, request StoreCredentialRequest) error {
Expand All @@ -148,22 +197,46 @@ func (cs *Storage) StoreStatusListCredential(ctx context.Context, request StoreC
}

func (cs *Storage) storeCredential(ctx context.Context, request StoreCredentialRequest, namespace string) error {

wc, err := cs.getStoreCredentialWriteContext(request, namespace)
if err != nil {
return errors.Wrap(err, "could not get stored credential write context")
}
// TODO(gabe) conflict checking?
return cs.db.Write(ctx, wc.namespace, wc.key, wc.value)
}

func (cs *Storage) GetStoreCredentialWriteContext(request StoreCredentialRequest) (*WriteContext, error) {
return cs.getStoreCredentialWriteContext(request, credentialNamespace)
}

func (cs *Storage) GetStoreStatusListCredentialWriteContext(request StoreCredentialRequest) (*WriteContext, error) {
return cs.getStoreCredentialWriteContext(request, statusListCredentialNamespace)
}

func (cs *Storage) getStoreCredentialWriteContext(request StoreCredentialRequest, namespace string) (*WriteContext, error) {
if !request.IsValid() {
return util.LoggingNewError("store request request is not valid")
return nil, util.LoggingNewError("store request request is not valid")
}

// transform the credential into its denormalized form for storage
storedCredential, err := buildStoredCredential(request)
if err != nil {
return errors.Wrap(err, "could not build stored credential")
return nil, errors.Wrap(err, "could not build stored credential")
}

storedCredBytes, err := json.Marshal(storedCredential)
if err != nil {
return util.LoggingErrorMsgf(err, "could not store request: %s", storedCredential.CredentialID)
return nil, util.LoggingErrorMsgf(err, "could not store request: %s", storedCredential.CredentialID)
}
// TODO(gabe) conflict checking?
return cs.db.Write(ctx, namespace, storedCredential.ID, storedCredBytes)

wc := WriteContext{
namespace: namespace,
key: storedCredential.ID,
value: storedCredBytes,
}

return &wc, nil
}

// buildStoredCredential generically parses a store credential request and returns the object to be stored
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,24 @@ func (b *BoltDB) Write(ctx context.Context, namespace string, key string, value
return nil
})
}
func (b *BoltDB) WriteMany(ctx context.Context, namespaces, keys []string, values [][]byte) error {
if len(namespaces) != len(keys) && len(namespaces) != len(values) {
return errors.New("namespaces, keys, and values, are not of equal length")
}

return b.db.Update(func(tx *bolt.Tx) error {
for i := range namespaces {
bucket, err := tx.CreateBucketIfNotExists([]byte(namespaces[i]))
if err != nil {
return err
}
if err = bucket.Put([]byte(keys[i]), values[i]); err != nil {
return err
}
}
return nil
})
}

func (b *BoltDB) Read(ctx context.Context, namespace, key string) ([]byte, error) {
var result []byte
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ServiceStorage interface {
IsOpen() bool
Close() error
Write(ctx context.Context, namespace, key string, value []byte) error
WriteMany(ctx context.Context, namespace, key []string, value [][]byte) error
Read(ctx context.Context, namespace, key string) ([]byte, error)
ReadAll(ctx context.Context, namespace string) (map[string][]byte, error)
ReadPrefix(ctx context.Context, namespace, prefix string) (map[string][]byte, error)
Expand Down