Skip to content

Commit

Permalink
Use intellij java google style to fix import ordering (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
baskaranz authored and feast-ci-bot committed Jan 7, 2019
1 parent c642d6b commit a59813f
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package feast.ingestion.deserializer;

import com.google.protobuf.InvalidProtocolBufferException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package feast.ingestion.deserializer;

import com.google.protobuf.InvalidProtocolBufferException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package feast.ingestion.deserializer;

import com.google.protobuf.MessageLite;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -13,9 +29,11 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
Expand All @@ -28,31 +46,29 @@
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;

@RunWith(SpringRunner.class)
@EmbeddedKafka(controlledShutdown = true)
@SpringBootTest
@DirtiesContext
public class KafkaFeatureRowDeserializerTest {

@Autowired private EmbeddedKafkaBroker embeddedKafka;
private static final String topic = "TEST_TOPIC";

@ClassRule public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, topic);
@Autowired private KafkaTemplate<byte[], byte[]> template;

private <MessageType extends MessageLite> void deserialize(MessageType input) {
// generate a random UUID to create a unique topic and consumer group id for each test
String uuid = UUID.randomUUID().toString();
String topic = "topic-" + uuid;

embeddedKafka.addTopics(topic);

Deserializer deserializer = new FeatureRowDeserializer();

Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(uuid, Boolean.FALSE.toString(), embeddedKafka);
KafkaTestUtils.consumerProps("testGroup", "false", embeddedKafka.getEmbeddedKafka());
ConsumerFactory<FeatureRow, FeatureRow> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps, deserializer, deserializer);

Expand All @@ -63,7 +79,8 @@ private <MessageType extends MessageLite> void deserialize(MessageType input) {
MessageListenerContainer container =
new KafkaMessageListenerContainer<>(consumerFactory, containerProps);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
ContainerTestUtils.waitForAssignment(
container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());

byte[] data = input.toByteArray();
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, data, data);
Expand Down Expand Up @@ -99,12 +116,10 @@ public void deserializeFeatureRowProto() {

@Configuration
static class ContextConfiguration {

@Autowired private EmbeddedKafkaBroker embeddedKafka;

@Bean
ProducerFactory<byte[], byte[]> producerFactory() {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());

return new DefaultKafkaProducerFactory<>(
producerProps, new ByteArraySerializer(), new ByteArraySerializer());
Expand Down

0 comments on commit a59813f

Please sign in to comment.