Skip to content

Commit

Permalink
implement cancel function (#1367)
Browse files Browse the repository at this point in the history
* implement cancel function

* addressing comments

* addressing comments

* Future implementation

* Future implementation

* Future implementation

* Future implementation

* Future implementation

* adding Future Task

* adding Future Task

* adding Future Task

* fixing future implementation

* fixing future implementation

* fixing future implementation

* fixing future implementation

* fixing future implementation

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing bugs

* fixing issues

* fixing issues

* fixing issues

* fixing issues

* fixing bugs

* fixing bugs

* fixing bugs

* fixing bugs

* fixing bugs

* fixing bugs

* fixing bugs

* fixing bugs

* ExecutionException

* fixing issues

* aggregation changes

* aggregation changes

* aggregation changes

* fixing bugs

* fixing bugs

* fixing bugs

* rebasing

* adddressing comments

* adddressing comments

* addressing comments

* addressing comments

Co-authored-by: Ramsha Rao <[email protected]>
  • Loading branch information
2 people authored and Aaron Klish committed Aug 7, 2020
1 parent fa22727 commit 4e62b2a
Show file tree
Hide file tree
Showing 29 changed files with 161 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,10 @@ default boolean supportsPagination(Class<?> entityClass, FilterExpression expres
* @return UUID id
*/
UUID getRequestId();

/**
* Cancel running transaction.
* Implementation must be thread-safe.
*/
void cancel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,9 @@ public boolean supportsPagination(Class<?> entityClass, FilterExpression express
private boolean containsObject(Object obj) {
return dataStore.get(obj.getClass()).containsValue(obj);
}

@Override
public void cancel() {
//nothing to cancel in HashMap store transaction
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,9 @@ private Pair<Optional<Pagination>, Optional<Pagination>> splitPagination(
return Pair.of(pagination, Optional.empty());
}
}

@Override
public void cancel() {
tx.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,9 @@ public Iterable<Object> loadObjects(EntityProjection projection, RequestScope sc
public void close() throws IOException {
tx.close();
}

@Override
public void cancel() {
tx.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,9 @@ public void commit(RequestScope scope) {
public void createObject(Object entity, RequestScope scope) {
// nothing
}

@Override
public void cancel() {
//nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,6 @@ public void testGetBoundByVersion() {
assertEquals(3, models.size()); //Also includes com.yahoo.elide inner classes from this file.
assertTrue(models.contains(BookV2.class));


models = getBoundClassesByVersion(NO_VERSION);
assertEquals(14, models.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,9 @@ Query buildQuery(EntityProjection entityProjection, RequestScope scope) {
scope.getDictionary());
return translator.getQuery();
}

@Override
public void cancel() {
queryEngineTransaction.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import java.util.Map;
import java.util.stream.Collectors;

/**
* A {@link QueryEngine} is an abstraction that an AggregationDataStore leverages to run analytic queries (OLAP style)
* against an underlying persistence layer.
Expand Down Expand Up @@ -157,6 +156,11 @@ private void populateMetaData(MetaDataStore metaDataStore) {
public interface Transaction extends AutoCloseable {
@Override
void close();

/**
* Cancels running transaction
*/
void cancel();
}

public abstract Transaction beginTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
Expand All @@ -60,13 +61,14 @@
@Slf4j
public class SQLQueryEngine extends QueryEngine {
private final EntityManagerFactory entityManagerFactory;

private final Consumer<EntityManager> transactionCancel;
private final SQLReferenceTable referenceTable;

public SQLQueryEngine(MetaDataStore metaDataStore, EntityManagerFactory entityManagerFactory) {
public SQLQueryEngine(MetaDataStore metaDataStore, EntityManagerFactory eMFactory, Consumer<EntityManager> txC) {
super(metaDataStore);
this.entityManagerFactory = entityManagerFactory;
this.entityManagerFactory = eMFactory;
this.referenceTable = new SQLReferenceTable(metaDataStore);
this.transactionCancel = txC;
}

@Override
Expand Down Expand Up @@ -118,14 +120,17 @@ public MetricProjection constructMetricProjection(Metric metric,
/**
* State needed for SQLQueryEngine to execute queries.
*/
static class SqlTransaction implements QueryEngine.Transaction {
static class SqlTransaction implements QueryEngine.Transaction {

private final EntityManager entityManager;
private final EntityTransaction transaction;
private final Consumer<EntityManager> transactionCancel;

SqlTransaction(EntityManagerFactory emf, Consumer<EntityManager> transactionCancel) {

SqlTransaction(EntityManagerFactory emf) {
entityManager = emf.createEntityManager();
transaction = entityManager.getTransaction();
this.transactionCancel = transactionCancel;
if (!transaction.isActive()) {
transaction.begin();
}
Expand All @@ -140,11 +145,17 @@ public void close() {
entityManager.close();
}
}

@Override
public void cancel() {
transactionCancel.accept(entityManager);
}

}

@Override
public QueryEngine.Transaction beginTransaction() {
return new SqlTransaction(entityManagerFactory);
return new SqlTransaction(entityManagerFactory, transactionCancel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void testReferenceLoop() {

IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> new SQLQueryEngine(metaDataStore, null));
() -> new SQLQueryEngine(metaDataStore, null, null));
assertEquals(
"Formula reference loop found: loop.playerLevel->loop.playerLevel",
exception.getMessage());
Expand All @@ -36,7 +36,7 @@ public void testJoinToLoop() {

IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> new SQLQueryEngine(metaDataStore, null));
() -> new SQLQueryEngine(metaDataStore, null, null));
assertEquals(
"Formula reference loop found: joinToLoop.playerLevel->joinToLoop.playerLevel",
exception.getMessage());
Expand All @@ -49,7 +49,7 @@ public void testCrossClassReferenceLoop() {

IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> new SQLQueryEngine(metaDataStore, null));
() -> new SQLQueryEngine(metaDataStore, null, null));

String exception1 = "Formula reference loop found: loopCountryA.inUsa->loopCountryB.inUsa->loopCountryA.inUsa";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void testReferenceLoop() {

IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> new SQLQueryEngine(metaDataStore, null));
() -> new SQLQueryEngine(metaDataStore, null, null));
assertEquals(
"Formula reference loop found: loop.highScore->loop.highScore",
exception.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
import com.yahoo.elide.datastores.jpa.transaction.NonJtaTransaction;
import com.yahoo.elide.datastores.multiplex.MultiplexManager;

import org.hibernate.Session;

import java.util.function.Consumer;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;

public class AggregationDataStoreTestHarness implements DataStoreTestHarness {
Expand All @@ -26,14 +31,15 @@ public AggregationDataStoreTestHarness(EntityManagerFactory entityManagerFactory
@Override
public DataStore getDataStore() {
MetaDataStore metaDataStore = new MetaDataStore();
Consumer<EntityManager> txCancel = (em) -> { em.unwrap(Session.class).cancelQuery(); };

AggregationDataStore aggregationDataStore = AggregationDataStore.builder()
.queryEngine(new SQLQueryEngine(metaDataStore, entityManagerFactory))
.queryEngine(new SQLQueryEngine(metaDataStore, entityManagerFactory, txCancel))
.build();

DataStore jpaStore = new JpaDataStore(
() -> entityManagerFactory.createEntityManager(),
NonJtaTransaction::new
(em) -> { return new NonJtaTransaction(em, txCancel); }
);

return new MultiplexManager(jpaStore, metaDataStore, aggregationDataStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import com.yahoo.elide.request.Argument;
import com.yahoo.elide.utils.ClassScanner;

import org.hibernate.Session;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -83,8 +85,9 @@ public static void init() {
filterParser = new RSQLFilterDialect(dictionary);

metaDataStore.populateEntityDictionary(dictionary);
Consumer<EntityManager> txCancel = (entityManager) -> { entityManager.unwrap(Session.class).cancelQuery(); };
engine = new SQLQueryEngine(metaDataStore, emf, txCancel);

engine = new SQLQueryEngine(metaDataStore, emf);
playerStatsTable = engine.getTable("playerStats");

ASIA.setName("Asia");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static void init() {
* Test loading all three records from the table.
*/
@Test
public void testFullTableLoad() {
public void testFullTableLoad() throws Exception {
Query query = Query.builder()
.table(playerStatsTable)
.metric(invoke(playerStatsTable.getMetric("lowScore")))
Expand Down Expand Up @@ -84,7 +84,7 @@ public void testFullTableLoad() {
* Test loading records using {@link FromSubquery}
*/
@Test
public void testFromSubQuery() {
public void testFromSubQuery() throws Exception {
Query query = Query.builder()
.table(playerStatsViewTable)
.metric(invoke(playerStatsViewTable.getMetric("highScore")))
Expand Down Expand Up @@ -185,7 +185,7 @@ public void testNotProjectedFilter() throws Exception {
}

@Test
public void testSortAggregatedMetric() {
public void testSortAggregatedMetric() throws Exception {
Map<String, Sorting.SortOrder> sortMap = new TreeMap<>();
sortMap.put("lowScore", Sorting.SortOrder.desc);

Expand Down Expand Up @@ -217,7 +217,7 @@ public void testSortAggregatedMetric() {
* Test sorting by dimension attribute which is not present in the query.
*/
@Test
public void testSortJoin() {
public void testSortJoin() throws Exception {
Map<String, Sorting.SortOrder> sortMap = new TreeMap<>();
sortMap.put("playerName", Sorting.SortOrder.asc);

Expand Down Expand Up @@ -259,7 +259,7 @@ public void testSortJoin() {
* Test pagination.
*/
@Test
public void testPagination() {
public void testPagination() throws Exception {
Query query = Query.builder()
.table(playerStatsTable)
.metric(invoke(playerStatsTable.getMetric("lowScore")))
Expand Down Expand Up @@ -349,7 +349,7 @@ public void testHavingClauseJoin() throws Exception {
* Test sorting by two different columns-one metric and one dimension.
*/
@Test
public void testSortByMultipleColumns() {
public void testSortByMultipleColumns() throws Exception {
Map<String, Sorting.SortOrder> sortMap = new TreeMap<>();
sortMap.put("lowScore", Sorting.SortOrder.desc);
sortMap.put("playerName", Sorting.SortOrder.asc);
Expand Down Expand Up @@ -392,7 +392,7 @@ public void testSortByMultipleColumns() {
* Test grouping by a dimension with a JoinTo annotation.
*/
@Test
public void testJoinToGroupBy() {
public void testJoinToGroupBy() throws Exception {
Query query = Query.builder()
.table(playerStatsTable)
.metric(invoke(playerStatsTable.getMetric("highScore")))
Expand Down Expand Up @@ -452,7 +452,7 @@ public void testJoinToFilter() throws Exception {
* Test grouping by a dimension with a JoinTo annotation.
*/
@Test
public void testJoinToSort() {
public void testJoinToSort() throws Exception {
Map<String, Sorting.SortOrder> sortMap = new TreeMap<>();
sortMap.put("countryIsoCode", Sorting.SortOrder.asc);
sortMap.put("highScore", Sorting.SortOrder.asc);
Expand Down Expand Up @@ -495,7 +495,7 @@ public void testJoinToSort() {
* Test month grain query.
*/
@Test
public void testTotalScoreByMonth() {
public void testTotalScoreByMonth() throws Exception {
Query query = Query.builder()
.table(playerStatsTable)
.metric(invoke(playerStatsTable.getMetric("highScore")))
Expand All @@ -517,7 +517,7 @@ public void testTotalScoreByMonth() {
* Test filter by time dimension.
*/
@Test
public void testFilterByTemporalDimension() {
public void testFilterByTemporalDimension() throws Exception {
FilterPredicate predicate = new FilterPredicate(
new Path(PlayerStats.class, dictionary, "recordedDate"),
Operator.IN,
Expand All @@ -542,7 +542,7 @@ public void testFilterByTemporalDimension() {
}

@Test
public void testAmbiguousFields() {
public void testAmbiguousFields() throws Exception {
Map<String, Sorting.SortOrder> sortMap = new TreeMap<>();
sortMap.put("lowScore", Sorting.SortOrder.asc);

Expand Down Expand Up @@ -581,7 +581,7 @@ public void testAmbiguousFields() {
}

@Test
public void testNullJoinToStringValue() {
public void testNullJoinToStringValue() throws Exception {
Query query = Query.builder()
.table(playerStatsTable)
.metric(invoke(playerStatsTable.getMetric("highScore")))
Expand All @@ -606,7 +606,7 @@ public void testNullJoinToStringValue() {
}

@Test
public void testNullJoinToIntValue() {
public void testNullJoinToIntValue() throws Exception {
Query query = Query.builder()
.table(playerStatsTable)
.metric(invoke(playerStatsTable.getMetric("highScore")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static void init() {
}

@Test
public void testViewAttribute() {
public void testViewAttribute() throws Exception {
Map<String, Sorting.SortOrder> sortMap = new TreeMap<>();
sortMap.put("countryViewIsoCode", Sorting.SortOrder.desc);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,4 +305,9 @@ public Integer getQueryLimit() {
// no limit
return null;
}

@Override
public void cancel() {
session.cancelQuery();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,9 @@ public void close() throws IOException {
throw new IOException("Transaction not closed");
}
}

@Override
public void cancel() {
session.cancelQuery();
}
}
Loading

0 comments on commit 4e62b2a

Please sign in to comment.