Skip to content

Commit

Permalink
common RowSetGenerator
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenliang123 committed Dec 15, 2023
1 parent a0fdead commit 8f5ebe9
Show file tree
Hide file tree
Showing 21 changed files with 830 additions and 1,028 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package org.apache.kyuubi.engine.chat.operation

import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.chat.schema.{RowSet, SchemaHelper}
import org.apache.kyuubi.engine.chat.schema.{RowSetGenerator, SchemaHelper}
import org.apache.kyuubi.engine.chat.schema.RowSetGenerator.COL_STRING_TYPE
import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState}
import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
import org.apache.kyuubi.session.Session
Expand All @@ -45,8 +46,11 @@ abstract class ChatOperation(session: Session) extends AbstractOperation(session
iter.fetchAbsolute(0)
}

val taken = iter.take(rowSetSize)
val resultRowSet = RowSet.toTRowSet(taken.toSeq, 1, getProtocolVersion)
val taken = iter.take(rowSetSize).map(_.toSeq)
val resultRowSet = new RowSetGenerator().toTRowSet(
taken.toSeq,
Seq(COL_STRING_TYPE),
getProtocolVersion)
resultRowSet.setStartRowOffset(iter.getPosition)
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(resultRowSet)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.chat.schema

import org.apache.kyuubi.engine.chat.schema.RowSetGenerator._
import org.apache.kyuubi.engine.schema.AbstractRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._

class RowSetGenerator
extends AbstractRowSetGenerator[Seq[String], Seq[String], String] {

override def getColumnSizeFromSchemaType(schema: Seq[String]): Int = schema.length

override def getColumnType(schema: Seq[String], ordinal: Int): String = COL_STRING_TYPE

override protected def isColumnNullAt(row: Seq[String], ordinal: Int): Boolean =
row(ordinal) == null

override def getColumnAs[T](row: Seq[String], ordinal: Int): T = row(ordinal).asInstanceOf[T]

override def toTColumn(
rows: Seq[Seq[String]],
ordinal: Int,
typ: String): TColumn = {
typ match {
case COL_STRING_TYPE => toTTypeColumn(STRING_TYPE, rows, ordinal)
case otherType => throw new UnsupportedOperationException(s"type $otherType")
}
}

override def toTColumnValue(
ordinal: Int,
row: Seq[String],
types: Seq[String]): TColumnValue = {
getColumnType(types, ordinal) match {
case "String" => toTTypeColumnVal(STRING_TYPE, row, ordinal)
case otherType => throw new UnsupportedOperationException(s"type $otherType")
}
}

}

object RowSetGenerator {
val COL_STRING_TYPE: String = classOf[String].getSimpleName
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.flink.types.Row

import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.engine.flink.schema.RowSet
import org.apache.kyuubi.engine.flink.schema.{RowSet, RowSetGenerator}
import org.apache.kyuubi.engine.flink.session.FlinkSessionImpl
import org.apache.kyuubi.operation.{AbstractOperation, OperationState}
import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
Expand Down Expand Up @@ -133,10 +133,9 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
case Some(tz) => ZoneId.of(tz)
case None => ZoneId.systemDefault()
}
val resultRowSet = RowSet.resultSetToTRowSet(
val resultRowSet = new RowSetGenerator(zoneId).toTRowSet(
batch.toList,
resultSet,
zoneId,
getProtocolVersion)
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(resultRowSet)
Expand Down
Loading

0 comments on commit 8f5ebe9

Please sign in to comment.