Skip to content

Commit

Permalink
add partition config for maxcompute (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Nov 18, 2021
1 parent c1f043a commit 389bead
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 1 deletion.
2 changes: 2 additions & 0 deletions nebula-exchange/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@
accessKeyId:xxx
accessKeySecret:xxx
partitionSpec:"dt='partition1'"
# default numPartitions is 1
numPartitions:100
# maxcompute sql sentence only uses table name. make sure that table name is the same with {table}'s value'.
sentence:"select id, maxcompute-field-0, maxcompute-field-1, maxcompute-field-2 from table where id < 10"
fields:[maxcompute-field-0, maxcompute-field-1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,11 @@ object Configs {
} else {
null
}
val numPartitions = if (config.hasPath("numPartitions")) {
config.getString("numPartitions")
} else {
"1"
}

val sentence = if (config.hasPath("sentence")) {
config.getString("sentence")
Expand All @@ -704,6 +709,7 @@ object Configs {
config.getString("accessKeyId"),
config.getString("accessKeySecret"),
partitionSpec,
numPartitions,
sentence
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ case class MaxComputeConfigEntry(override val category: SourceCategory.Value,
accessKeyId: String,
accessKeySecret: String,
partitionSpec: String,
numPartitions: String,
override val sentence: String)
extends ServerDataSourceConfigEntry {
require(
Expand All @@ -239,7 +240,7 @@ case class MaxComputeConfigEntry(override val category: SourceCategory.Value,
override def toString: String = {
s"MaxCompute source {odpsUrl: $odpsUrl, tunnelUrl: $tunnelUrl, table: $table, project: $project, " +
s"keyId: $accessKeyId, keySecret: $accessKeySecret, partitionSpec:$partitionSpec, " +
s"sentence:$sentence}"
s"numPartitions:$numPartitions, sentence:$sentence}"
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ class MaxcomputeReader(override val session: SparkSession, maxComputeConfig: Max
.option("project", maxComputeConfig.project)
.option("accessKeyId", maxComputeConfig.accessKeyId)
.option("accessKeySecret", maxComputeConfig.accessKeySecret)
.option("numPartitions", maxComputeConfig.numPartitions)

// if use partition read
if (maxComputeConfig.partitionSpec != null) {
Expand Down

0 comments on commit 389bead

Please sign in to comment.