Skip to content

Commit

Permalink
Add more message if read/write to StorageDeltaMerge throw exception
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang committed Mar 17, 2020
1 parent 7470c2f commit 716ae4a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
11 changes: 10 additions & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,16 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
info.range_in_table = current_region->getHandleRangeByTable(table_id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
query_info.mvcc_query_info->concurrent = 0.0;
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
try
{
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
}
catch (DB::Exception & e)
{
e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName()
+ "`, table_id: " + DB::toString(table_id) + ")");
throw;
}

if (pipeline.streams.empty())
{
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ class DeltaMergeStore : private boost::noncopyable
const Settings & settings_);
~DeltaMergeStore();

const String & getDatabaseName() const { return db_name; }
const String & getTableName() const { return table_name; }

void shutdown();

void write(const Context & db_context, const DB::Settings & db_settings, const Block & block);
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class DMBlockOutputStream : public IBlockOutputStream
Block getHeader() const override { return header; }

void write(const Block & block) override
try
{
if (db_settings.dm_insert_max_rows == 0)
{
Expand All @@ -283,6 +284,11 @@ class DMBlockOutputStream : public IBlockOutputStream
}
}
}
catch (DB::Exception & e)
{
e.addMessage("(while writing to table `" + store->getDatabaseName() + "`.`" + store->getTableName() + "`)");
throw;
}

private:
DeltaMergeStorePtr store;
Expand Down Expand Up @@ -1028,7 +1034,7 @@ void StorageDeltaMerge::shutdown()
{
bool v = false;
if (!shutdown_called.compare_exchange_strong(v, true))
return ;
return;

store->shutdown();
}
Expand Down

0 comments on commit 716ae4a

Please sign in to comment.