Skip to content

Commit

Permalink
fix message order for destiantion acceptance tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yurii-bidiuk committed Dec 1, 2021
1 parent 3b52744 commit 2603ad8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,21 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -1016,12 +1022,16 @@ protected void assertSameMessages(final List<AirbyteMessage> expected,
.map(AirbyteMessage::getRecord)
.peek(recordMessage -> recordMessage.setEmittedAt(null))
.map(recordMessage -> pruneAirbyteInternalFields ? safePrune(recordMessage) : recordMessage)
.map(recordMessage -> recordMessage.getData())
.map(AirbyteRecordMessage::getData)
.peek(this::sortDataFields)
.sorted(Comparator.comparing(JsonNode::toString))
.collect(Collectors.toList());

final List<JsonNode> actualProcessed = actual.stream()
.map(recordMessage -> pruneAirbyteInternalFields ? safePrune(recordMessage) : recordMessage)
.map(recordMessage -> recordMessage.getData())
.map(AirbyteRecordMessage::getData)
.peek(this::sortDataFields)
.sorted(Comparator.comparing(JsonNode::toString))
.collect(Collectors.toList());

assertSameData(expectedProcessed, actualProcessed);
Expand Down Expand Up @@ -1059,6 +1069,21 @@ private void assertSameData(final List<JsonNode> expected, final List<JsonNode>
}
}

/**
* Method that will sort all fields by name and rewrite JsonNode in sorted order
*
* @param data - data node from AirbyteMessage
*/
private void sortDataFields(JsonNode data) {
var sortedFields = StreamSupport.stream(Spliterators.spliteratorUnknownSize(data.fields(),
Spliterator.ORDERED), false)
.sorted(Entry.comparingByKey(Comparator.comparing(String::toLowerCase)))
.collect(Collectors.toList());
((ObjectNode) data).removeAll();
IntStream.range(0, sortedFields.size())
.forEach(i -> ((ObjectNode) data).set(sortedFields.get(i).getKey(), sortedFields.get(i).getValue()));
}

// Allows subclasses to implement custom comparison asserts
protected void assertSameValue(final JsonNode expectedValue, final JsonNode actualValue) {
assertEquals(expectedValue, actualValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,13 @@
{"type": "RECORD", "record": {"stream": "reserved_keywords", "emitted_at": 1602637589000, "data": { "order" : "ascending" }}}
{"type": "RECORD", "record": {"stream": "groups", "emitted_at": 1602637589000, "data": { "authorization" : "into" }}}
{"type": "RECORD", "record": {"stream": "ProperCase", "emitted_at": 1602637589000, "data": { "ProperCase" : true }}}
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589200, "data": { "some_id" : 101, "some_field" : "some_field_1", "some_next_field" : "some_next_field_1" }}}
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589200, "data": { "some_id" : 101, "some_next_field" : "some_next_field_1", "some_field" : "some_field_1" }}}
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589250, "data": { "some_id" : 102, "some_field" : "some_field_2" }}}
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589300, "data": { "some_id" : 103, "some_next_field" : "some_next_field_3" }}}
{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589350, "data": { "some_id" : 104 }}}
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589400, "data": { "some_id" : 201, "next_field_name" : "next_field_name_1" }}}
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589450, "data": { "some_id" : 202, "next_field_name" : "next_field_name_2" }}}
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589400, "data": { "next_field_name" : "next_field_name_1", "some_id" : 201 }}}
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589450, "data": { "next_field_name" : "next_field_name_2", "some_id" : 202 }}}
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589500, "data": { "some_id" : 203 }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-1-1", "some_datetime" : "2021-1-1 01:01:01" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-01-11", "some_datetime" : "2021-1-1 01:01:01 +1" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-01-15", "some_datetime" : "2021-1-1T01:01:01 +1:00" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-01-16", "some_datetime" : "2021-01-01 01:01:01" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-11-13", "some_datetime" : "2021-01-01 01:01:01 +0000" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-10-12", "some_datetime" : "2021-01-01T01:01:01Z" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-12-17", "some_datetime" : "2021-01-01T01:01:01-01:00" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-01-18", "some_datetime" : "2021-01-01T01:01:01 +1:00" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-01-19", "some_datetime" : "2021-01-01 01:01:01 UTC" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-01-20", "some_datetime" : "2021-01-01T01:01:01 UTC" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-01-21", "some_datetime" : "2021-01-01T01:01:01 +1" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-01-22", "some_datetime" : "2021-01-01T01:01:01 +0000" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-01-23", "some_datetime" : "2021-01-01T01:01:01+0000" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "some_date" : "2021-01-24", "some_datetime" : "2021-01-01T01:01:01+1" }}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "date" : "2021-1-1"}}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "date" : "1/1/2021"}}}
{"type": "RECORD", "record": {"stream": "stream_with_dates", "emitted_at": 1602637589000, "data": { "date" : "20210101"}}}
Expand Down

0 comments on commit 2603ad8

Please sign in to comment.