Skip to content

Commit

Permalink
Update syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Oct 18, 2023
1 parent bde22f1 commit d2fb937
Showing 1 changed file with 28 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testThriftOptionalFieldsWithReadProjectionUsingParquetSchema() throw

@Test
public void testPullingInRequiredStructWithFilter() throws Exception {
final String projectionFilterDesc = "persons/{id};persons/email";
final String projectionFilterDesc = "persons.{id};persons.email";
TBase toWrite = new AddressBook(
Arrays.asList(
new Person(
Expand Down Expand Up @@ -122,8 +122,7 @@ public void testReorderdOptionalFields() throws Exception {

@Test
public void testProjectOutOptionalFields() throws Exception {

final String projectionFilterDesc = "persons/name/*";
final String projectionFilterDesc = "persons.name.*";

TBase toWrite = new AddressBook(
Arrays.asList(
Expand Down Expand Up @@ -164,7 +163,7 @@ public void testPullInRequiredMaps() throws Exception {

@Test
public void testDropMapValuePrimitive() throws Exception {
String filter = "mavalue/key";
String filter = "mavalue.key";

Map<String, String> mapValue = new HashMap<String, String>();
mapValue.put("a", "1");
Expand Down Expand Up @@ -199,7 +198,7 @@ private StructV4WithExtracStructField makeStructV4WithExtracStructField(String i

@Test
public void testDropMapValueStruct() throws Exception {
String filter = "reqMap/key";
String filter = "reqMap.key";

Map<String, StructV4WithExtracStructField> mapValue = new HashMap<String, StructV4WithExtracStructField>();

Expand All @@ -222,7 +221,7 @@ public void testDropMapValueStruct() throws Exception {

@Test
public void testDropMapValueNestedPrim() throws Exception {
String filter = "reqMap/key";
String filter = "reqMap.key";

Map<String, Map<String, String>> mapValue =
new HashMap<String, Map<String, String>>();
Expand Down Expand Up @@ -261,10 +260,9 @@ public void testDropMapValueNestedPrim() throws Exception {

@Test
public void testDropMapValueNestedStruct() throws Exception {
String filter = "reqMap/key";
String filter = "reqMap.key";

Map<String, Map<String, StructV4WithExtracStructField>> mapValue =
new HashMap<String, Map<String, StructV4WithExtracStructField>>();
Map<String, Map<String, StructV4WithExtracStructField>> mapValue = new HashMap<>();

Map<String, StructV4WithExtracStructField> innerValue1 = new HashMap<String, StructV4WithExtracStructField>();
innerValue1.put("inner key (1, 1)", makeStructV4WithExtracStructField("inner (1, 1)"));
Expand Down Expand Up @@ -353,29 +351,29 @@ private void shouldDoProjectionWithThriftColumnFilter(String filterDesc, TBase t
//create a test file
final TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
final TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
final ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(parquetFile, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, thriftClass);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));

recordToWrite.write(protocol);
w.write(new BytesWritable(baos.toByteArray()));
w.close();

try (ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(parquetFile, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, thriftClass);
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
recordToWrite.write(protocol);
w.write(new BytesWritable(baos.toByteArray()));
}

final ParquetThriftInputFormat<T> parquetThriftInputFormat = new ParquetThriftInputFormat<T>();
final Job job = new Job(conf, "read");
job.setInputFormatClass(ParquetThriftInputFormat.class);
ParquetThriftInputFormat.setInputPaths(job, parquetFile);
final JobID jobID = new JobID("local", 1);
List<InputSplit> splits = parquetThriftInputFormat.getSplits(ContextUtil.newJobContext(ContextUtil.getConfiguration(job), jobID));
T readValue = null;
for (InputSplit split : splits) {
TaskAttemptContext taskAttemptContext = ContextUtil.newTaskAttemptContext(ContextUtil.getConfiguration(job), new TaskAttemptID(new TaskID(jobID, true, 1), 0));
try (final RecordReader<Void, T> reader = parquetThriftInputFormat.createRecordReader(split, taskAttemptContext)) {
reader.initialize(split, taskAttemptContext);
if (reader.nextKeyValue()) {
readValue = reader.getCurrentValue();
LOG.info("{}", readValue);

final ParquetThriftInputFormat<T> parquetThriftInputFormat = new ParquetThriftInputFormat<>();
try (Job job = new Job(conf, "read")) {
job.setInputFormatClass(ParquetThriftInputFormat.class);
ParquetThriftInputFormat.setInputPaths(job, parquetFile);
final JobID jobID = new JobID("local", 1);
List<InputSplit> splits = parquetThriftInputFormat.getSplits(ContextUtil.newJobContext(ContextUtil.getConfiguration(job), jobID));
for (InputSplit split : splits) {
TaskAttemptContext taskAttemptContext = ContextUtil.newTaskAttemptContext(ContextUtil.getConfiguration(job), new TaskAttemptID(new TaskID(jobID, true, 1), 0));
try (final RecordReader<Void, T> reader = parquetThriftInputFormat.createRecordReader(split, taskAttemptContext)) {
reader.initialize(split, taskAttemptContext);
if (reader.nextKeyValue()) {
readValue = reader.getCurrentValue();
LOG.info("{}", readValue);
}
}
}
}
Expand Down

0 comments on commit d2fb937

Please sign in to comment.