Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for entities with incomplete key to Datastore.put #1040

Merged
merged 1 commit into from
Jun 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,65 @@ public final void update(Entity... entities) {
}
}

@SafeVarargs
private void putInternal(FullEntity<Key> entity) {
Key key = entity.key();
toAdd.remove(key);
toUpdate.remove(key);
toDelete.remove(key);
toPut.put(key, entity);
}

@Override
public final void put(Entity... entities) {
public final Entity put(FullEntity<?> entity) {
return DatastoreHelper.put(this, entity);
}

@SuppressWarnings("unchecked")
@Override
public final void putWithDeferredIdAllocation(FullEntity<?>... entities) {
validateActive();
for (Entity entity : entities) {
Key key = entity.key();
toAdd.remove(key);
toUpdate.remove(key);
toDelete.remove(key);
toPut.put(key, entity);
for (FullEntity<?> entity : entities) {
IncompleteKey key = entity.key();
Preconditions.checkArgument(key != null, "Entity must have a key");
if (key instanceof Key) {
putInternal(Entity.convert((FullEntity<Key>) entity));
} else {
toAddAutoId.add((FullEntity<IncompleteKey>) entity);
}
}
}

@SuppressWarnings("unchecked")
@Override
public final List<Entity> put(FullEntity<?>... entities) {
validateActive();
List<IncompleteKey> incompleteKeys = Lists.newArrayListWithExpectedSize(entities.length);
for (FullEntity<?> entity : entities) {
IncompleteKey key = entity.key();
Preconditions.checkArgument(key != null, "Entity must have a key");
if (!(key instanceof Key)) {
incompleteKeys.add(key);
}
}
Iterator<Key> allocated;
if (!incompleteKeys.isEmpty()) {
IncompleteKey[] toAllocate = Iterables.toArray(incompleteKeys, IncompleteKey.class);
allocated = datastore().allocateId(toAllocate).iterator();
} else {
allocated = Collections.emptyIterator();
}
List<Entity> answer = Lists.newArrayListWithExpectedSize(entities.length);
for (FullEntity<?> entity : entities) {
if (entity.key() instanceof Key) {
putInternal((FullEntity<Key>) entity);
answer.add(Entity.convert((FullEntity<Key>) entity));
} else {
Entity entityWithAllocatedId = Entity.builder(allocated.next(), entity).build();
putInternal(entityWithAllocatedId);
answer.add(entityWithAllocatedId);
}
}
return answer;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,28 +77,35 @@ interface TransactionCallable<T> {
* @throws DatastoreException upon failure
* @see #allocateId(IncompleteKey)
*/
List<Key> allocateId(IncompleteKey... key);
List<Key> allocateId(IncompleteKey... keys);

/**
* {@inheritDoc}
* @throws DatastoreException upon failure
*/
@Override
void update(Entity... entity);
void update(Entity... entities);

/**
* {@inheritDoc}
* @throws DatastoreException upon failure
*/
@Override
void put(Entity... entity);
Entity put(FullEntity<?> entity);

/**
* {@inheritDoc}
* @throws DatastoreException upon failure
*/
@Override
void delete(Key... key);
List<Entity> put(FullEntity<?>... entities);

/**
* {@inheritDoc}
* @throws DatastoreException upon failure
*/
@Override
void delete(Key... keys);

/**
* Returns a new KeyFactory for this service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,72 @@
import java.util.List;

/**
* An interface to represent a batch of write operations.
* All write operation for a batch writer will be applied to the Datastore in one RPC call.
* An interface to represent a batch of write operations. All write operation for a batch writer
* will be applied to the Datastore in one RPC call.
*/
interface DatastoreBatchWriter extends DatastoreWriter {

/**
* Datastore add operation.
* This method will also allocate id for any entity with an incomplete key.
* As oppose to {@link #add(FullEntity)}, this method will defer any necessary id allocation
* Datastore add operation. This method will also allocate id for any entity with an incomplete
* key. As opposed to {@link #add(FullEntity)}, this method will defer any necessary id allocation
* to submit time.
*
* @throws IllegalArgumentException if any of the given entities is missing a key
* @throws DatastoreException if a given entity with a
* complete key was already added to this writer or if not active
* @throws DatastoreException if a given entity with a complete key was already added to this
* writer or if not active
*/
void addWithDeferredIdAllocation(FullEntity<?>... entity);
void addWithDeferredIdAllocation(FullEntity<?>... entities);

/**
* {@inheritDoc}
* For entities with complete keys that were marked for deletion in this writer the operation
* will be changed to {@link #put}.
* @throws DatastoreException if a given entity with the
* same complete key was already added to this writer, if writer is not active or
* if id allocation for an entity with an incomplete key failed.
*
* @throws DatastoreException if a given entity with the same complete key was already added to
* this writer, if writer is not active or if id allocation for an entity with an incomplete
* key failed.
*/
@Override
List<Entity> add(FullEntity<?>... entity);
List<Entity> add(FullEntity<?>... entities);

/**
* {@inheritDoc}
* This operation will be converted to {@link #put} operation for entities that were already
* added or put in this writer
* @throws DatastoreException if an entity is marked for
* deletion in this writer or if not active
* added or put in this writer.
*
* @throws DatastoreException if an entity is marked for deletion in this writer or if not active
*/
@Override
void update(Entity... entity);
void update(Entity... entities);

/**
* {@inheritDoc}
* This operation will also remove from this batch any prior writes for entities with the same
* keys
* keys
*
* @throws DatastoreException if not active
*/
@Override
void delete(Key... key);
void delete(Key... keys);

/**
* Datastore put operation. This method will also allocate id for any entity with an incomplete
* key. As opposed to {@link #put(FullEntity[])}, this method will defer any necessary id
* allocation to submit time.
*
* @throws IllegalArgumentException if any of the given entities is missing a key
* @throws DatastoreException if not active
*/
void putWithDeferredIdAllocation(FullEntity<?>... entities);

/**
* {@inheritDoc}
* This operation will also remove from this writer any prior writes for the same entities.
*
* @throws DatastoreException if not active
*/
@Override
void put(Entity... entity);
List<Entity> put(FullEntity<?>... entities);

/**
* Returns {@code true} if still active (write operations were not sent to the Datastore).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ static Entity add(DatastoreWriter writer, FullEntity<?> entity) {
return writer.add(new FullEntity<?>[] {entity}).get(0);
}

static Entity put(DatastoreWriter writer, FullEntity<?> entity) {
return writer.put(new FullEntity<?>[] {entity}).get(0);
}

static KeyFactory newKeyFactory(DatastoreOptions options) {
return new KeyFactory(options.projectId(), options.namespace());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,12 @@ com.google.datastore.v1beta3.RunQueryResponse runQuery(
try {
return RetryHelper.runWithRetries(
new Callable<com.google.datastore.v1beta3.RunQueryResponse>() {
@Override public com.google.datastore.v1beta3.RunQueryResponse call()
throws DatastoreException {
return datastoreRpc.runQuery(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
@Override
public com.google.datastore.v1beta3.RunQueryResponse call()
throws DatastoreException {
return datastoreRpc.runQuery(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
} catch (RetryHelperException e) {
throw DatastoreException.translateAndThrow(e);
}
Expand Down Expand Up @@ -125,11 +126,12 @@ com.google.datastore.v1beta3.AllocateIdsResponse allocateIds(
try {
return RetryHelper.runWithRetries(
new Callable<com.google.datastore.v1beta3.AllocateIdsResponse>() {
@Override public com.google.datastore.v1beta3.AllocateIdsResponse call()
throws DatastoreException {
return datastoreRpc.allocateIds(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
@Override
public com.google.datastore.v1beta3.AllocateIdsResponse call()
throws DatastoreException {
return datastoreRpc.allocateIds(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
} catch (RetryHelperException e) {
throw DatastoreException.translateAndThrow(e);
}
Expand Down Expand Up @@ -166,7 +168,7 @@ public List<Entity> add(FullEntity<?>... entities) {
"Duplicate entity with the key %s", entity.key());
}
} else {
Preconditions.checkArgument(entity.hasKey(), "entity %s is missing a key", entity);
Preconditions.checkArgument(entity.hasKey(), "Entity %s is missing a key", entity);
}
mutationsPb.add(com.google.datastore.v1beta3.Mutation.newBuilder()
.setInsert(entity.toPb()).build());
Expand Down Expand Up @@ -281,19 +283,19 @@ com.google.datastore.v1beta3.LookupResponse lookup(
try {
return RetryHelper.runWithRetries(
new Callable<com.google.datastore.v1beta3.LookupResponse>() {
@Override public com.google.datastore.v1beta3.LookupResponse call()
throws DatastoreException {
return datastoreRpc.lookup(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
@Override
public com.google.datastore.v1beta3.LookupResponse call()
throws DatastoreException {
return datastoreRpc.lookup(requestPb);
}
}, retryParams, EXCEPTION_HANDLER, options().clock());
} catch (RetryHelperException e) {
throw DatastoreException.translateAndThrow(e);
}
}

@SafeVarargs
@Override
public final void update(Entity... entities) {
public void update(Entity... entities) {
if (entities.length > 0) {
List<com.google.datastore.v1beta3.Mutation> mutationsPb =
new ArrayList<>();
Expand All @@ -309,22 +311,47 @@ public final void update(Entity... entities) {
}
}

@SafeVarargs
@Override
public final void put(Entity... entities) {
if (entities.length > 0) {
List<com.google.datastore.v1beta3.Mutation> mutationsPb =
new ArrayList<>();
Map<Key, Entity> dedupEntities = new LinkedHashMap<>();
for (Entity entity : entities) {
dedupEntities.put(entity.key(), entity);
}
for (Entity e : dedupEntities.values()) {
public Entity put(FullEntity<?> entity) {
return DatastoreHelper.put(this, entity);
}

@SuppressWarnings("unchecked")
@Override
public List<Entity> put(FullEntity<?>... entities) {
if (entities.length == 0) {
return Collections.emptyList();
}
List<com.google.datastore.v1beta3.Mutation> mutationsPb = new ArrayList<>();
Map<Key, Entity> dedupEntities = new LinkedHashMap<>();
for (FullEntity<?> entity : entities) {
Preconditions.checkArgument(entity.hasKey(), "Entity %s is missing a key", entity);
if (entity.key() instanceof Key) {
Entity completeEntity = Entity.convert((FullEntity<Key>) entity);
dedupEntities.put(completeEntity.key(), completeEntity);
} else {
mutationsPb.add(
com.google.datastore.v1beta3.Mutation.newBuilder().setUpsert(e.toPb()).build());
com.google.datastore.v1beta3.Mutation.newBuilder().setUpsert(entity.toPb()).build());
}
}
for (Entity entity : dedupEntities.values()) {
mutationsPb.add(
com.google.datastore.v1beta3.Mutation.newBuilder().setUpsert(entity.toPb()).build());
}
com.google.datastore.v1beta3.CommitResponse commitResponse = commitMutation(mutationsPb);
Iterator<com.google.datastore.v1beta3.MutationResult> mutationResults =
commitResponse.getMutationResultsList().iterator();
ImmutableList.Builder<Entity> responseBuilder = ImmutableList.builder();
for (FullEntity<?> entity : entities) {
Entity completeEntity = dedupEntities.get(entity.key());
if (completeEntity != null) {
responseBuilder.add(completeEntity);
} else {
responseBuilder.add(
Entity.builder(Key.fromPb(mutationResults.next().getKey()), entity).build());
}
commitMutation(mutationsPb);
}
return responseBuilder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface DatastoreReader {
* @throws DatastoreException upon failure
* @see #get(Key)
*/
Iterator<Entity> get(Key... key);
Iterator<Entity> get(Key... keys);

/**
* Returns a list with a value for each given key (ordered by input). {@code null} values are
Expand Down
Loading