-
Notifications
You must be signed in to change notification settings - Fork 315
/
DataFrameWriter.cs
240 lines (214 loc) · 9.68 KB
/
DataFrameWriter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using Microsoft.Spark.Interop.Internal.Java.Util;
using Microsoft.Spark.Interop.Ipc;
namespace Microsoft.Spark.Sql
{
/// <summary>
/// Interface used to write a DataFrame to external storage systems (e.g. file systems,
/// key-value stores, etc).
/// </summary>
public sealed class DataFrameWriter : IJvmObjectReferenceProvider
{
private readonly JvmObjectReference _jvmObject;
internal DataFrameWriter(JvmObjectReference jvmObject) => _jvmObject = jvmObject;
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
/// <summary>
/// Specifies the behavior when data or table already exists.
/// </summary>
/// <remarks>
/// Options include:
/// - SaveMode.Overwrite: overwrite the existing data.
/// - SaveMode.Append: append the data.
/// - SaveMode.Ignore: ignore the operation (i.e. no-op).
/// - SaveMode.ErrorIfExists: default option, throw an exception at runtime.
/// </remarks>
/// <param name="saveMode">Save mode enum</param>
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Mode(SaveMode saveMode) => Mode(saveMode.ToString());
/// <summary>
/// Specifies the behavior when data or table already exists.
/// </summary>
/// <remarks>
/// Options include:
/// - "overwrite": overwrite the existing data.
/// - "append": append the data.
/// - "ignore": ignore the operation (i.e.no-op).
/// - "error" or "errorifexists": default option, throw an exception at runtime.
/// </remarks>
/// <param name="saveMode">Save mode string</param>
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Mode(string saveMode)
{
_jvmObject.Invoke("mode", saveMode);
return this;
}
/// <summary>
/// Specifies the underlying output data source. Built-in options include
/// "parquet", "json", etc.
/// </summary>
/// <param name="source">Data source name</param>
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Format(string source)
{
_jvmObject.Invoke("format", source);
return this;
}
/// <summary>
/// Adds an output option for the underlying data source.
/// </summary>
/// <param name="key">Name of the option</param>
/// <param name="value">Value of the option</param>
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Option(string key, string value)
{
return OptionInternal(key, value);
}
/// <summary>
/// Adds an output option for the underlying data source.
/// </summary>
/// <param name="key">Name of the option</param>
/// <param name="value">Value of the option</param>
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Option(string key, bool value)
{
return OptionInternal(key, value);
}
/// <summary>
/// Adds an output option for the underlying data source.
/// </summary>
/// <param name="key">Name of the option</param>
/// <param name="value">Value of the option</param>
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Option(string key, long value)
{
return OptionInternal(key, value);
}
/// <summary>
/// Adds an output option for the underlying data source.
/// </summary>
/// <param name="key">Name of the option</param>
/// <param name="value">Value of the option</param>
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Option(string key, double value)
{
return OptionInternal(key, value);
}
/// <summary>
/// Adds output options for the underlying data source.
/// </summary>
/// <param name="options">Key/value options</param>
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Options(Dictionary<string, string> options)
{
_jvmObject.Invoke("options", options);
return this;
}
/// <summary>
/// Partitions the output by the given columns on the file system. If specified,
/// the output is laid out on the file system similar to Hive's partitioning scheme.
/// </summary>
/// <param name="colNames">Column names to partition by</param>
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter PartitionBy(params string[] colNames)
{
_jvmObject.Invoke("partitionBy", (object)colNames);
return this;
}
/// <summary>
/// Buckets the output by the given columns. If specified, the output is laid out
/// on the file system similar to Hive's bucketing scheme.
/// </summary>
/// <param name="numBuckets">Number of buckets to save</param>
/// <param name="colName">A column name</param>
/// <param name="colNames">Additional column names</param>
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter BucketBy(
int numBuckets,
string colName,
params string[] colNames)
{
_jvmObject.Invoke("bucketBy", numBuckets, colName, colNames);
return this;
}
/// <summary>
/// Sorts the output in each bucket by the given columns.
/// </summary>
/// <param name="colName">A name of a column</param>
/// <param name="colNames">Additional column names</param>
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter SortBy(string colName, params string[] colNames)
{
_jvmObject.Invoke("sortBy", colName, colNames);
return this;
}
/// <summary>
/// Saves the content of the DataFrame at the specified path.
/// </summary>
/// <param name="path">Path to save the content</param>
public void Save(string path) => _jvmObject.Invoke("save", path);
/// <summary>
/// Saves the content of the DataFrame as the specified table.
/// </summary>
public void Save() => _jvmObject.Invoke("save");
/// <summary>
/// Inserts the content of the DataFrame to the specified table. It requires that
/// the schema of the DataFrame is the same as the schema of the table.
/// </summary>
/// <param name="tableName">Name of the table</param>
public void InsertInto(string tableName) => _jvmObject.Invoke("insertInto", tableName);
/// <summary>
/// Saves the content of the DataFrame as the specified table.
/// </summary>
/// <param name="tableName">Name of the table</param>
public void SaveAsTable(string tableName) => _jvmObject.Invoke("saveAsTable", tableName);
/// <summary>
/// Saves the content of the DataFrame to a external database table via JDBC
/// </summary>
/// <param name="url">JDBC database URL of the form "jdbc:subprotocol:subname"</param>
/// <param name="table">Name of the table in the external database</param>
/// <param name="properties">JDBC database connection arguments</param>
public void Jdbc(string url, string table, Dictionary<string, string> properties)
{
_jvmObject.Invoke("jdbc", url, table, new Properties(_jvmObject.Jvm, properties));
}
/// <summary>
/// Saves the content of the DataFrame in JSON format at the specified path.
/// </summary>
/// <param name="path">Path to save the content</param>
public void Json(string path) => _jvmObject.Invoke("json", path);
/// <summary>
/// Saves the content of the DataFrame in Parquet format at the specified path.
/// </summary>
/// <param name="path">Path to save the content</param>
public void Parquet(string path) => _jvmObject.Invoke("parquet", path);
/// <summary>
/// Saves the content of the DataFrame in ORC format at the specified path.
/// </summary>
/// <param name="path">Path to save the content</param>
public void Orc(string path) => _jvmObject.Invoke("orc", path);
/// <summary>
/// Saves the content of the DataFrame in a text file at the specified path.
/// </summary>
/// <param name="path">Path to save the content</param>
public void Text(string path) => _jvmObject.Invoke("text", path);
/// <summary>
/// Saves the content of the DataFrame in CSV format at the specified path.
/// </summary>
/// <param name="path">Path to save the content</param>
public void Csv(string path) => _jvmObject.Invoke("csv", path);
/// <summary>
/// Helper function to add given key/value pair as a new option.
/// </summary>
/// <param name="key">Name of the option</param>
/// <param name="value">Value of the option</param>
/// <returns>This DataFrameWriter object</returns>
private DataFrameWriter OptionInternal(string key, object value)
{
_jvmObject.Invoke("option", key, value);
return this;
}
}
}