Skip to content

Commit

Permalink
Merge branch 'jsoneachrow_import_nested' of https://github.com/veloma…
Browse files Browse the repository at this point in the history
…n-yunkan/ClickHouse into veloman-yunkan-jsoneachrow_import_nested
  • Loading branch information
alexey-milovidov committed Sep 17, 2018
2 parents 8ba2ec9 + f51d791 commit 8852660
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 70 deletions.
1 change: 1 addition & 0 deletions dbms/src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.import_nested_json = settings.input_format_import_nested_json;
format_settings.date_time_input_format = settings.date_time_input_format;
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Formats/FormatSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct FormatSettings

bool skip_unknown_fields = false;
bool write_statistics = true;
bool import_nested_json = false;

enum class DateTimeInputFormat
{
Expand Down
192 changes: 123 additions & 69 deletions dbms/src/Formats/JSONEachRowRowInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <Formats/JSONEachRowRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>

#include <DataTypes/NestedUtils.h>

namespace DB
{
Expand All @@ -14,6 +14,17 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA;
}

namespace
{

enum
{
UNKNOWN_FIELD = size_t(-1),
NESTED_FIELD = size_t(-2)
};

} // unnamed namespace


JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & format_settings)
: istr(istr_), header(header_), format_settings(format_settings), name_map(header.columns())
Expand All @@ -23,17 +34,42 @@ JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const B

size_t num_columns = header.columns();
for (size_t i = 0; i < num_columns; ++i)
name_map[header.safeGetByPosition(i).name] = i; /// NOTE You could place names more cache-locally.
{
const String& colname = columnName(i);
name_map[colname] = i; /// NOTE You could place names more cache-locally.
if ( format_settings.import_nested_json )
{
const auto splitted = Nested::splitName(colname);
if ( ! splitted.second.empty() )
{
const StringRef table_name(colname.data(), splitted.first.size());
name_map[table_name] = NESTED_FIELD;
}
}
}
}

const String& JSONEachRowRowInputStream::columnName(size_t i) const
{
return header.safeGetByPosition(i).name;
}

size_t JSONEachRowRowInputStream::columnIndex(const StringRef& name) const
{
/// NOTE Optimization is possible by caching the order of fields (which is almost always the same)
/// and a quick check to match the next expected field, instead of searching the hash table.

const auto it = name_map.find(name);
return name_map.end() == it ? UNKNOWN_FIELD : it->second;
}

/** Read the field name in JSON format.
* A reference to the field name will be written to ref.
* You can also use temporary `tmp` buffer to copy field name there.
/** Read the field name and convert it to column name
* (taking into account the current nested name prefix)
*/
static StringRef readName(ReadBuffer & buf, String & tmp)
StringRef JSONEachRowRowInputStream::readColumnName(ReadBuffer & buf)
{
if (buf.position() + 1 < buf.buffer().end())
// This is just an optimization: try to avoid copying the name into current_column_name
if (nested_prefix_length == 0 && buf.position() + 1 < buf.buffer().end())
{
const char * next_pos = find_first_symbols<'\\', '"'>(buf.position() + 1, buf.buffer().end());

Expand All @@ -48,8 +84,9 @@ static StringRef readName(ReadBuffer & buf, String & tmp)
}
}

readJSONString(tmp, buf);
return tmp;
current_column_name.resize(nested_prefix_length);
readJSONStringInto(current_column_name, buf);
return current_column_name;
}


Expand All @@ -60,89 +97,106 @@ static void skipColonDelimeter(ReadBuffer & istr)
skipWhitespaceIfAny(istr);
}

void JSONEachRowRowInputStream::skipUnknownField(const StringRef& name_ref)
{
if (!format_settings.skip_unknown_fields)
throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);

bool JSONEachRowRowInputStream::read(MutableColumns & columns)
skipJSONField(istr, name_ref);
}

void JSONEachRowRowInputStream::readField(size_t index, MutableColumns & columns)
{
skipWhitespaceIfAny(istr);
if (read_columns[index])
throw Exception("Duplicate field found while parsing JSONEachRow format: " + columnName(index), ErrorCodes::INCORRECT_DATA);

/// We consume ;, or \n before scanning a new row, instead scanning to next row at the end.
/// The reason is that if we want an exact number of rows read with LIMIT x
/// from a streaming table engine with text data format, like File or Kafka
/// then seeking to next ;, or \n would trigger reading of an extra row at the end.
read_columns[index] = true;

/// Semicolon is added for convenience as it could be used at end of INSERT query.
if (!istr.eof() && (*istr.position() == ',' || *istr.position() == ';'))
++istr.position();
try
{
header.getByPosition(index).type->deserializeTextJSON(*columns[index], istr, format_settings);
}
catch (Exception & e)
{
e.addMessage("(while read the value of key " + columnName(index) + ")");
throw;
}
}

bool JSONEachRowRowInputStream::advanceToNextKey(size_t key_index)
{
skipWhitespaceIfAny(istr);

if (istr.eof())
throw Exception("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
else if (*istr.position() == '}')
{
++istr.position();
return false;
}

assertChar('{', istr);

size_t num_columns = columns.size();
if (key_index > 0)
{
assertChar(',', istr);
skipWhitespaceIfAny(istr);
}
return true;
}

/// Set of columns for which the values were read. The rest will be filled with default values.
/// TODO Ability to provide your DEFAULTs.
bool read_columns[num_columns];
memset(read_columns, 0, num_columns);
void JSONEachRowRowInputStream::readJSONObject(MutableColumns & columns)
{
assertChar('{', istr);

bool first = true;
while (true)
for ( size_t key_index = 0 ; advanceToNextKey(key_index) ; ++key_index )
{
skipWhitespaceIfAny(istr);
StringRef name_ref = readColumnName(istr);

if (istr.eof())
throw Exception("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
else if (*istr.position() == '}')
{
++istr.position();
break;
}
skipColonDelimeter(istr);

if (first)
first = false;
const size_t column_index = columnIndex(name_ref);
if ( column_index == UNKNOWN_FIELD )
skipUnknownField(name_ref);
else if ( column_index == NESTED_FIELD )
readNestedData(name_ref.toString(), columns);
else
{
assertChar(',', istr);
skipWhitespaceIfAny(istr);
}

StringRef name_ref = readName(istr, name_buf);
readField(column_index, columns);
}
}

/// NOTE Optimization is possible by caching the order of fields (which is almost always the same)
/// and a quick check to match the next expected field, instead of searching the hash table.
void JSONEachRowRowInputStream::readNestedData(const String& name, MutableColumns & columns)
{
current_column_name = name;
current_column_name.push_back('.');
nested_prefix_length = current_column_name.size();
readJSONObject(columns);
nested_prefix_length = 0;
}

auto it = name_map.find(name_ref);
if (name_map.end() == it)
{
if (!format_settings.skip_unknown_fields)
throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
bool JSONEachRowRowInputStream::read(MutableColumns & columns)
{
skipWhitespaceIfAny(istr);

skipColonDelimeter(istr);
skipJSONField(istr, name_ref);
continue;
}
/// We consume ;, or \n before scanning a new row, instead scanning to next row at the end.
/// The reason is that if we want an exact number of rows read with LIMIT x
/// from a streaming table engine with text data format, like File or Kafka
/// then seeking to next ;, or \n would trigger reading of an extra row at the end.

size_t index = it->second;
/// Semicolon is added for convenience as it could be used at end of INSERT query.
if (!istr.eof() && (*istr.position() == ',' || *istr.position() == ';'))
++istr.position();

if (read_columns[index])
throw Exception("Duplicate field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
skipWhitespaceIfAny(istr);
if (istr.eof())
return false;

skipColonDelimeter(istr);
size_t num_columns = columns.size();

read_columns[index] = true;
/// Set of columns for which the values were read. The rest will be filled with default values.
/// TODO Ability to provide your DEFAULTs.
read_columns.assign(num_columns, false);

try
{
header.getByPosition(index).type->deserializeTextJSON(*columns[index], istr, format_settings);
}
catch (Exception & e)
{
e.addMessage("(while read the value of key " + name_ref.toString() + ")");
throw;
}
}
nested_prefix_length = 0;
readJSONObject(columns);

/// Fill non-visited columns with the default values.
for (size_t i = 0; i < num_columns; ++i)
Expand Down
24 changes: 23 additions & 1 deletion dbms/src/Formats/JSONEachRowRowInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,36 @@ class JSONEachRowRowInputStream : public IRowInputStream
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;

private:
const String& columnName(size_t i) const;
size_t columnIndex(const StringRef& name) const;
bool advanceToNextKey(size_t key_index);
void skipUnknownField(const StringRef& name_ref);
StringRef readColumnName(ReadBuffer & buf);
void readField(size_t index, MutableColumns & columns);
void readJSONObject(MutableColumns & columns);
void readNestedData(const String& name, MutableColumns & columns);

private:
ReadBuffer & istr;
Block header;

const FormatSettings format_settings;

/// Buffer for the read from the stream field name. Used when you have to copy it.
String name_buf;
/// Also, if processing of Nested data is in progress, it holds the common prefix
/// of the nested column names (so that appending the field name to it produces
/// the full column name)
String current_column_name;

/// If processing Nested data, holds the length of the common prefix
/// of the names of related nested columns. For example, for a table
/// created as follows
/// CREATE TABLE t (n Nested (i Int32, s String))
/// the nested column names are 'n.i' and 'n.s' and the nested prefix is 'n.'
size_t nested_prefix_length = 0;

std::vector<bool> read_columns;

/// Hash table match `field name -> position in the block`. NOTE You can use perfect hash map.
using NameMap = HashMap<StringRef, size_t, StringRefHash>;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/IO/ReadHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ void readJSONString(String & s, ReadBuffer & buf)
template void readJSONStringInto<PaddedPODArray<UInt8>, void>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template bool readJSONStringInto<PaddedPODArray<UInt8>, bool>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readJSONStringInto<NullSink>(NullSink & s, ReadBuffer & buf);
template void readJSONStringInto<String>(String & s, ReadBuffer & buf);


template <typename ReturnType>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ struct Settings
M(SettingBool, add_http_cors_header, false, "Write add http CORS header.") \
\
M(SettingBool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow and TSKV formats).") \
M(SettingBool, input_format_import_nested_json, false, "Map nested JSON data to nested tables (it works for JSONEachRow format).") \
\
M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.") \
\
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
1 ok ['abc','def'] [1,23]
0 [] []
0 ['x','y','z'] [45,67,8]
1 ok ['dog','cat','pig'] [3,3,3]
1 ok ['zero','negative one'] [0,-1]
1 ok [] []
0 [] []
0 [] []
1 ok [] []
1 ok [] []
1 ok ['abc','def'] [1,23]
0 [] []
0 ['x','y','z'] [45,67,8]
1 ok ['dog','cat','pig'] [3,3,3]
1 ok ['zero','negative one'] [0,-1]
38 changes: 38 additions & 0 deletions dbms/tests/queries/0_stateless/00715_json_each_row_input_nested.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env bash

set -e

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh

$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.json_each_row_nested"
$CLICKHOUSE_CLIENT -q "CREATE TABLE test.json_each_row_nested (d1 UInt8, d2 String, n Nested (s String, i Int32) ) ENGINE = Memory"

echo '{"d1" : 1, "d2" : "ok", "n.s" : ["abc", "def"], "n.i" : [1, 23]}
{ }
{"t1" : 0, "n.t2":true,"n.i":[45, 67, 8], "n.s":["x", "y", "z"],"t5":[],"t6":"trash" }
{"d2":"ok","n.s":["dog", "cat", "pig"], "n.x":[["1","2"]], "d1":"1", "n.i":[3, 3, 3]}
{"t0" : -0.1, "n.s" : ["zero","negative one"], "a.b" : 0, "n.i" : [0, -1], "d2" : "ok", "d1" : 1}' \
| $CLICKHOUSE_CLIENT --input_format_skip_unknown_fields=1 -q "INSERT INTO test.json_each_row_nested FORMAT JSONEachRow"

$CLICKHOUSE_CLIENT --max_threads=1 -q "SELECT * FROM test.json_each_row_nested"

test_nested_json()
{
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.json_each_row_nested"

$CLICKHOUSE_CLIENT -q "CREATE TABLE test.json_each_row_nested (d1 UInt8, d2 String, n Nested (s String, i Int32) ) ENGINE = Memory"

echo '{"d1" : 1, "d2" : "ok", "n" : { "s" : ["abc", "def"], "i" : [1, 23]} }
{ }
{"t1" : 0, "n.t2":true,"n" : {"i":[45, 67, 8], "s":["x", "y", "z"]}, "t5":[],"t6":"trash" }
{"d2":"ok","n" : {"s":["dog", "cat", "pig"], "x":[["1","2"]], "i":[3, 3, 3]}, "d1":"1", "n.j":[4, 4, 4]}
{"t0" : -0.1, "n": {"s" : ["zero","negative one"], "i" : [0, -1]}, "d2" : "ok", "d1" : 1}' \
| $CLICKHOUSE_CLIENT "$@" --input_format_skip_unknown_fields=1 -q "INSERT INTO test.json_each_row_nested FORMAT JSONEachRow"

$CLICKHOUSE_CLIENT --max_threads=1 -q "SELECT * FROM test.json_each_row_nested"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.json_each_row_nested"
}

test_nested_json
test_nested_json --input_format_import_nested_json=1

0 comments on commit 8852660

Please sign in to comment.