Skip to content

Commit

Permalink
Address review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshanWeerasinghe committed Mar 21, 2024
1 parent f4f54da commit c415c47
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,26 +302,20 @@ BLangStatementExpression desugar(BLangQueryExpr queryExpr, SymbolEnv env,
resultType = streamRef.getBType();
} else if (queryExpr.isTable) {
BLangVariableReference tableRef = addTableConstructor(queryExpr, queryBlock);
if (onConflictExpr == null) {
result = getStreamFunctionVariableRef(queryBlock,
QUERY_ADD_TO_TABLE_FUNCTION, Lists.of(streamRef, tableRef, isReadonly), pos);
} else {
result = getStreamFunctionVariableRef(queryBlock,
QUERY_ADD_TO_TABLE_FOR_ON_CONFLICT_FUNCTION, Lists.of(streamRef, tableRef, isReadonly), pos);
}
Name internalFuncName = onConflictExpr == null ? QUERY_ADD_TO_TABLE_FUNCTION
: QUERY_ADD_TO_TABLE_FOR_ON_CONFLICT_FUNCTION;
result = getStreamFunctionVariableRef(queryBlock,
internalFuncName, Lists.of(streamRef, tableRef, isReadonly), pos);
resultType = tableRef.getBType();
onConflictExpr = null;
} else if (queryExpr.isMap) {
BMapType mapType = getMapType(queryExpr.getBType());
BLangRecordLiteral.BLangMapLiteral mapLiteral = new BLangRecordLiteral.BLangMapLiteral(queryExpr.pos,
mapType, new ArrayList<>());
if (onConflictExpr == null) {
result = getStreamFunctionVariableRef(queryBlock,
QUERY_ADD_TO_MAP_FUNCTION, Lists.of(streamRef, mapLiteral, isReadonly), pos);
} else {
result = getStreamFunctionVariableRef(queryBlock,
QUERY_ADD_TO_MAP_FOR_ON_CONFLICT_FUNCTION, Lists.of(streamRef, mapLiteral, isReadonly), pos);
}
Name internalFuncName = onConflictExpr == null ? QUERY_ADD_TO_MAP_FUNCTION
: QUERY_ADD_TO_MAP_FOR_ON_CONFLICT_FUNCTION;
result = getStreamFunctionVariableRef(queryBlock,
internalFuncName, Lists.of(streamRef, mapLiteral, isReadonly), pos);
onConflictExpr = null;
} else if (queryExpr.getFinalClause().getKind() == NodeKind.COLLECT) {
result = getStreamFunctionVariableRef(queryBlock, COLLECT_QUERY_FUNCTION, Lists.of(streamRef), pos);
Expand Down Expand Up @@ -940,10 +934,10 @@ BLangVariableReference addSelectFunction(BLangBlockStmt blockStmt, BLangSelectCl
/**
* Desugar onConflictClause to below and return a reference to created onConflict _StreamFunction.
* _StreamFunction onConflictFunc = createOnConflictFunction
* @param blockStmt
* @param onConflictClause
* @param stmtsToBePropagated
* @return
* @param blockStmt parent block to write to.
* @param onConflictClause to be desugared.
* @param stmtsToBePropagated list of statements to be propagated.
* @return variableReference to created onConflict _StreamFunction.
*/
BLangVariableReference addOnConflictFunction(BLangBlockStmt blockStmt, BLangOnConflictClause onConflictClause,
List<BLangStatement> stmtsToBePropagated) {
Expand Down
34 changes: 13 additions & 21 deletions langlib/lang.query/src/main/ballerina/helpers.bal
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ function createSelectFunction(function(_Frame _frame) returns _Frame|error? sele
}

function createOnConflictFunction(function(_Frame _frame) returns _Frame|error? onConflictFunc)
returns _StreamFunction {
return new _OnConflictFunction(onConflictFunc);
}
returns _StreamFunction => new _OnConflictFunction(onConflictFunc);

function createCollectFunction(string[] nonGroupingKeys, function(_Frame _frame) returns _Frame|error? collectFunc) returns _StreamFunction {
return new _CollectFunction(nonGroupingKeys, collectFunc);
Expand All @@ -98,9 +96,8 @@ function getStreamFromPipeline(_StreamPipeline pipeline) returns stream<Type, Co
return pipeline.getStream();
}

function getStreamForOnConflictFromPipeline(_StreamPipeline pipeline) returns stream<Type, CompletionType> {
return pipeline.getStreamForOnConflict();
}
function getStreamForOnConflictFromPipeline(_StreamPipeline pipeline) returns stream<Type, CompletionType>
=> pipeline.getStreamForOnConflict();

function toArray(stream<Type, CompletionType> strm, Type[] arr, boolean isReadOnly) returns Type[]|error {
if isReadOnly {
Expand Down Expand Up @@ -208,9 +205,9 @@ function addToTableForOnConflict(stream<Type, CompletionType> strm, table<map<Ty
// original table variable (tbl). Then the newly created table variable will be populated using createTable()
// and make it immutable with createImmutableTable().
table<map<Type>> tempTbl = table [];
table<map<Type>> tbl2 = createTableWithKeySpecifier(tbl, typeof(tempTbl));
table<map<Type>> tempTable = check createTableForOnConflict(strm, tbl2);
return createImmutableTable(tbl, tempTable.toArray());
table<map<Type>> mutableTableRef = createTableWithKeySpecifier(tbl, typeof(tempTbl));
_ = check createTableForOnConflict(strm, mutableTableRef);
return createImmutableTable(tbl, mutableTableRef.toArray());
}
return createTableForOnConflict(strm, tbl);
}
Expand All @@ -219,11 +216,12 @@ function createTableForOnConflict(stream<Type, CompletionType> strm, table<map<T
returns table<map<Type>>|error {
record {| Type value; |}|CompletionType v = strm.next();
while v is record {| Type value; |} {
record {|Type v; error? err;|}|error value = trap (<record {|Type v; error? err;|}> checkpanic v.value);
record {|Type v; error? err;|}|error value = v.value.ensureType();
if value is error {
return value;
}
error? e = trap tbl.add(<map<Type>> checkpanic value.v);
map<Type> tblValue = check value.v.ensureType();
error? e = trap tbl.add(tblValue);
error? err = value.err;
if e is error && err is error {
return err;
Expand All @@ -233,10 +231,7 @@ function createTableForOnConflict(stream<Type, CompletionType> strm, table<map<T
}
v = strm.next();
}
if (v is error) {
return v;
}
return tbl;
return v is error ? v : tbl;
}

function addToMap(stream<Type, CompletionType> strm, map<Type> mp, boolean isReadOnly) returns map<Type>|error {
Expand Down Expand Up @@ -286,11 +281,11 @@ function addToMapForOnConflict(stream<Type, CompletionType> strm, map<Type> mp,
function createMapForOnConflict(stream<Type, CompletionType> strm, map<Type> mp) returns map<Type>|error {
record {| Type value; |}|CompletionType v = strm.next();
while v is record {| Type value; |} {
record {|Type v; error? err;|}|error value = trap (<record {|Type v; error? err;|}> checkpanic v.value);
record {|Type v; error? err;|}|error value = v.value.ensureType();
if value is error {
return value;
}
[string, Type]|error keyValue = trap (<[string, Type]> checkpanic value.v);
[string, Type]|error keyValue = value.v.ensureType();
if keyValue is error {
return keyValue;
}
Expand All @@ -302,10 +297,7 @@ function createMapForOnConflict(stream<Type, CompletionType> strm, map<Type> mp)
mp[key] = keyValue[1];
v = strm.next();
}
if (v is error) {
return v;
}
return mp;
return v is error ? v : mp;
}

function consumeStream(stream<Type, CompletionType> strm) returns any|error {
Expand Down

0 comments on commit c415c47

Please sign in to comment.