From 8e2ed1125af851e20c3ab60adfa810569d6f493e Mon Sep 17 00:00:00 2001 From: Andrew Slabko Date: Wed, 8 Apr 2026 09:27:52 +0200 Subject: [PATCH] Make ReceivePacket return data that it has processed --- clickhouse/client.cpp | 197 ++++++++++++++++++++++++++++-------------- 1 file changed, 131 insertions(+), 66 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index a8c8ea64..824df923 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -10,6 +10,7 @@ #include #include +#include #include #include @@ -146,6 +147,51 @@ std::unique_ptr GetEndpointsIterator(const ClientOptions& } + +template +struct overloaded : Ts... { using Ts::operator()...; }; +template +overloaded(Ts...) -> overloaded; + +struct DataTag { + Block block; +}; +struct ExceptionTag { +}; +struct ProfileTag { + Profile profile; +}; +struct ProgressTag { + Progress progress; +}; +struct PongTag { +}; +struct HelloTag { +}; +struct LogTag { + Block block; +}; +struct TableColumnsTag { +}; +struct ProfileEventsTag { + Block block; +}; +struct EndOfStreamTag { +}; + +using EncodedPacket = std::variant< + std::monostate, + DataTag, + ExceptionTag, + ProfileTag, + ProgressTag, + PongTag, + HelloTag, + LogTag, + TableColumnsTag, + ProfileEventsTag, + EndOfStreamTag>; + class Client::Impl { public: Impl(const ClientOptions& opts); @@ -180,7 +226,8 @@ class Client::Impl { private: bool Handshake(); - bool ReceivePacket(uint64_t* server_packet = nullptr); + EncodedPacket ReceivePacket(uint64_t* server_packet = nullptr); + bool ProcessPacket(uint64_t* server_packet = nullptr); void SendQuery(const Query& query, bool finalize = true); void FinalizeQuery(); @@ -197,7 +244,7 @@ class Client::Impl { bool ReceiveHello(); /// Reads data packet form input stream. - bool ReceiveData(); + bool ReceiveData(Block * block); /// Reads exception packet form input stream. bool ReceiveException(bool rethrow = false); @@ -308,7 +355,7 @@ void Client::Impl::ExecuteQuery(Query query) { SendQuery(query); - while (ReceivePacket()) { + while (ProcessPacket()) { ; } } @@ -333,7 +380,7 @@ void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& ext SendExternalData(external_tables); FinalizeQuery(); - while (ReceivePacket()) { + while (ProcessPacket()) { ; } } @@ -408,7 +455,7 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer // Wait for a data packet and return uint64_t server_packet = 0; - while (ReceivePacket(&server_packet)) { + while (ProcessPacket(&server_packet)) { if (server_packet == ServerCodes::Data) { SendData(block); EndInsert(); @@ -443,7 +490,7 @@ Block Client::Impl::BeginInsert(Query query) { // Wait for a data packet and return uint64_t server_packet = 0; - while (ReceivePacket(&server_packet)) { + while (ProcessPacket(&server_packet)) { if (server_packet == ServerCodes::Data) { return block; } @@ -470,7 +517,7 @@ void Client::Impl::EndInsert() { // Wait for EOS. uint64_t eos_packet{0}; - while (ReceivePacket(&eos_packet)) { + while (ProcessPacket(&eos_packet)) { ; } @@ -491,7 +538,7 @@ void Client::Impl::Ping() { output_->Flush(); uint64_t server_packet; - const bool ret = ReceivePacket(&server_packet); + const bool ret = ProcessPacket(&server_packet); if (!ret || server_packet != ServerCodes::Pong) { throw ProtocolError("fail to ping server"); @@ -569,157 +616,176 @@ bool Client::Impl::Handshake() { return true; } -bool Client::Impl::ReceivePacket(uint64_t* server_packet) { +EncodedPacket Client::Impl::ReceivePacket(uint64_t* server_packet) { uint64_t packet_type = 0; if (!WireFormat::ReadVarint64(*input_, &packet_type)) { - return false; + return {}; } if (server_packet) { *server_packet = packet_type; } switch (packet_type) { + case ServerCodes::Data: { - if (!ReceiveData()) { + DataTag ret{}; + if (!ReceiveData(&ret.block)) { throw ProtocolError("can't read data packet from input stream"); } - return true; + return ret; } case ServerCodes::Exception: { + ExceptionTag ret{}; ReceiveException(); - return false; + return ret; } case ServerCodes::ProfileInfo: { - Profile profile; + ProfileTag ret{}; - if (!WireFormat::ReadUInt64(*input_, &profile.rows)) { - return false; + if (!WireFormat::ReadUInt64(*input_, &ret.profile.rows)) { + return {}; } - if (!WireFormat::ReadUInt64(*input_, &profile.blocks)) { - return false; + if (!WireFormat::ReadUInt64(*input_, &ret.profile.blocks)) { + return {}; } - if (!WireFormat::ReadUInt64(*input_, &profile.bytes)) { - return false; + if (!WireFormat::ReadUInt64(*input_, &ret.profile.bytes)) { + return {}; } - if (!WireFormat::ReadFixed(*input_, &profile.applied_limit)) { - return false; + if (!WireFormat::ReadFixed(*input_, &ret.profile.applied_limit)) { + return {}; } - if (!WireFormat::ReadUInt64(*input_, &profile.rows_before_limit)) { - return false; + if (!WireFormat::ReadUInt64(*input_, &ret.profile.rows_before_limit)) { + return {}; } - if (!WireFormat::ReadFixed(*input_, &profile.calculated_rows_before_limit)) { - return false; + if (!WireFormat::ReadFixed(*input_, &ret.profile.calculated_rows_before_limit)) { + return {}; } if (events_) { - events_->OnProfile(profile); + events_->OnProfile(ret.profile); } - return true; + return ret; } case ServerCodes::Progress: { - Progress info; + ProgressTag ret; - if (!WireFormat::ReadUInt64(*input_, &info.rows)) { - return false; + if (!WireFormat::ReadUInt64(*input_, &ret.progress.rows)) { + return {}; } - if (!WireFormat::ReadUInt64(*input_, &info.bytes)) { - return false; + if (!WireFormat::ReadUInt64(*input_, &ret.progress.bytes)) { + return {}; } if constexpr(DMBS_PROTOCOL_REVISION >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS) { - if (!WireFormat::ReadUInt64(*input_, &info.total_rows)) { - return false; + if (!WireFormat::ReadUInt64(*input_, &ret.progress.total_rows)) { + return {}; } } if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO) { - if (!WireFormat::ReadUInt64(*input_, &info.written_rows)) { - return false; + if (!WireFormat::ReadUInt64(*input_, &ret.progress.written_rows)) { + return {}; } - if (!WireFormat::ReadUInt64(*input_, &info.written_bytes)) { - return false; + if (!WireFormat::ReadUInt64(*input_, &ret.progress.written_bytes)) { + return {}; } } if (events_) { - events_->OnProgress(info); + events_->OnProgress(ret.progress); } - return true; + return ret; } case ServerCodes::Pong: { - return true; + return PongTag{}; } case ServerCodes::Hello: { - return true; + return HelloTag{}; } case ServerCodes::EndOfStream: { if (events_) { events_->OnFinish(); } - return false; + return EndOfStreamTag{}; } case ServerCodes::Log: { // log tag if (!WireFormat::SkipString(*input_)) { - return false; + return {}; } - Block block; + LogTag ret; // Use uncompressed stream since log blocks usually contain only one row - if (!ReadBlock(*input_, &block)) { - return false; + if (!ReadBlock(*input_, &ret.block)) { + return {}; } if (events_) { - events_->OnServerLog(block); + events_->OnServerLog(ret.block); } - return true; + return ret; } case ServerCodes::TableColumns: { // external table name if (!WireFormat::SkipString(*input_)) { - return false; + return {}; } // columns metadata if (!WireFormat::SkipString(*input_)) { - return false; + return {}; } - return true; + return TableColumnsTag{}; } case ServerCodes::ProfileEvents: { if (!WireFormat::SkipString(*input_)) { - return false; + return {}; } - Block block; - if (!ReadBlock(*input_, &block)) { - return false; + ProfileEventsTag ret; + if (!ReadBlock(*input_, &ret.block)) { + return {}; } if (events_) { - events_->OnProfileEvents(block); + events_->OnProfileEvents(ret.block); } - return true; + return ret; } default: throw UnimplementedError("unimplemented " + std::to_string((int)packet_type)); - break; } } +bool Client::Impl::ProcessPacket(uint64_t* server_packet) { + auto packet = ReceivePacket(server_packet); + return std::visit(overloaded{ + [](std::monostate) { return false; }, + [](DataTag) { return true; }, + [](ExceptionTag) {return false; }, + [](ProfileTag){ return true; }, + [](ProgressTag){ return true; }, + [](PongTag){ return true; }, + [](HelloTag){ return true; }, + [](LogTag){ return true; }, + [](TableColumnsTag){ return true; }, + [](ProfileEventsTag){ return true; }, + [](EndOfStreamTag){ return false; }, + }, packet); +} + bool Client::Impl::ReadBlock(InputStream& input, Block* block) { // Additional information about block. if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { @@ -793,8 +859,7 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) { return true; } -bool Client::Impl::ReceiveData() { - Block block; +bool Client::Impl::ReceiveData(Block * block) { if (server_info_.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) { if (!WireFormat::SkipString(*input_)) { @@ -804,18 +869,18 @@ bool Client::Impl::ReceiveData() { if (compression_ == CompressionState::Enable) { CompressedInput compressed(input_.get()); - if (!ReadBlock(compressed, &block)) { + if (!ReadBlock(compressed, block)) { return false; } } else { - if (!ReadBlock(*input_, &block)) { + if (!ReadBlock(*input_, block)) { return false; } } if (events_) { - events_->OnData(block); - if (!events_->OnDataCancelable(block)) { + events_->OnData(*block); + if (!events_->OnDataCancelable(*block)) { SendCancel(); } }