Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 131 additions & 66 deletions clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <assert.h>
#include <system_error>
#include <variant>
#include <vector>
#include <sstream>

Expand Down Expand Up @@ -146,6 +147,51 @@ std::unique_ptr<EndpointsIteratorBase> GetEndpointsIterator(const ClientOptions&

}


template<class... Ts>
struct overloaded : Ts... { using Ts::operator()...; };
template<class... Ts>
overloaded(Ts...) -> overloaded<Ts...>;

struct DataTag {
Block block;
};
struct ExceptionTag {
};
struct ProfileTag {
Profile profile;
};
struct ProgressTag {
Progress progress;
};
struct PongTag {
};
struct HelloTag {
};
struct LogTag {
Block block;
};
struct TableColumnsTag {
};
struct ProfileEventsTag {
Block block;
};
struct EndOfStreamTag {
};

using EncodedPacket = std::variant<
std::monostate,
DataTag,
ExceptionTag,
ProfileTag,
ProgressTag,
PongTag,
HelloTag,
LogTag,
TableColumnsTag,
ProfileEventsTag,
EndOfStreamTag>;

class Client::Impl {
public:
Impl(const ClientOptions& opts);
Expand Down Expand Up @@ -180,7 +226,8 @@ class Client::Impl {
private:
bool Handshake();

bool ReceivePacket(uint64_t* server_packet = nullptr);
EncodedPacket ReceivePacket(uint64_t* server_packet = nullptr);
bool ProcessPacket(uint64_t* server_packet = nullptr);

void SendQuery(const Query& query, bool finalize = true);
void FinalizeQuery();
Expand All @@ -197,7 +244,7 @@ class Client::Impl {
bool ReceiveHello();

/// Reads data packet form input stream.
bool ReceiveData();
bool ReceiveData(Block * block);

/// Reads exception packet form input stream.
bool ReceiveException(bool rethrow = false);
Expand Down Expand Up @@ -308,7 +355,7 @@ void Client::Impl::ExecuteQuery(Query query) {

SendQuery(query);

while (ReceivePacket()) {
while (ProcessPacket()) {
;
}
}
Expand All @@ -333,7 +380,7 @@ void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& ext
SendExternalData(external_tables);
FinalizeQuery();

while (ReceivePacket()) {
while (ProcessPacket()) {
;
}
}
Expand Down Expand Up @@ -408,7 +455,7 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer

// Wait for a data packet and return
uint64_t server_packet = 0;
while (ReceivePacket(&server_packet)) {
while (ProcessPacket(&server_packet)) {
if (server_packet == ServerCodes::Data) {
SendData(block);
EndInsert();
Expand Down Expand Up @@ -443,7 +490,7 @@ Block Client::Impl::BeginInsert(Query query) {

// Wait for a data packet and return
uint64_t server_packet = 0;
while (ReceivePacket(&server_packet)) {
while (ProcessPacket(&server_packet)) {
if (server_packet == ServerCodes::Data) {
return block;
}
Expand All @@ -470,7 +517,7 @@ void Client::Impl::EndInsert() {

// Wait for EOS.
uint64_t eos_packet{0};
while (ReceivePacket(&eos_packet)) {
while (ProcessPacket(&eos_packet)) {
;
}

Expand All @@ -491,7 +538,7 @@ void Client::Impl::Ping() {
output_->Flush();

uint64_t server_packet;
const bool ret = ReceivePacket(&server_packet);
const bool ret = ProcessPacket(&server_packet);

if (!ret || server_packet != ServerCodes::Pong) {
throw ProtocolError("fail to ping server");
Expand Down Expand Up @@ -569,157 +616,176 @@ bool Client::Impl::Handshake() {
return true;
}

bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
EncodedPacket Client::Impl::ReceivePacket(uint64_t* server_packet) {
uint64_t packet_type = 0;

if (!WireFormat::ReadVarint64(*input_, &packet_type)) {
return false;
return {};
}
if (server_packet) {
*server_packet = packet_type;
}

switch (packet_type) {

case ServerCodes::Data: {
if (!ReceiveData()) {
DataTag ret{};
if (!ReceiveData(&ret.block)) {
throw ProtocolError("can't read data packet from input stream");
}
return true;
return ret;
}

case ServerCodes::Exception: {
ExceptionTag ret{};
ReceiveException();
return false;
return ret;
}

case ServerCodes::ProfileInfo: {
Profile profile;
ProfileTag ret{};

if (!WireFormat::ReadUInt64(*input_, &profile.rows)) {
return false;
if (!WireFormat::ReadUInt64(*input_, &ret.profile.rows)) {
return {};
}
if (!WireFormat::ReadUInt64(*input_, &profile.blocks)) {
return false;
if (!WireFormat::ReadUInt64(*input_, &ret.profile.blocks)) {
return {};
}
if (!WireFormat::ReadUInt64(*input_, &profile.bytes)) {
return false;
if (!WireFormat::ReadUInt64(*input_, &ret.profile.bytes)) {
return {};
}
if (!WireFormat::ReadFixed(*input_, &profile.applied_limit)) {
return false;
if (!WireFormat::ReadFixed(*input_, &ret.profile.applied_limit)) {
return {};
}
if (!WireFormat::ReadUInt64(*input_, &profile.rows_before_limit)) {
return false;
if (!WireFormat::ReadUInt64(*input_, &ret.profile.rows_before_limit)) {
return {};
}
if (!WireFormat::ReadFixed(*input_, &profile.calculated_rows_before_limit)) {
return false;
if (!WireFormat::ReadFixed(*input_, &ret.profile.calculated_rows_before_limit)) {
return {};
}

if (events_) {
events_->OnProfile(profile);
events_->OnProfile(ret.profile);
}

return true;
return ret;
}

case ServerCodes::Progress: {
Progress info;
ProgressTag ret;

if (!WireFormat::ReadUInt64(*input_, &info.rows)) {
return false;
if (!WireFormat::ReadUInt64(*input_, &ret.progress.rows)) {
return {};
}
if (!WireFormat::ReadUInt64(*input_, &info.bytes)) {
return false;
if (!WireFormat::ReadUInt64(*input_, &ret.progress.bytes)) {
return {};
}
if constexpr(DMBS_PROTOCOL_REVISION >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS) {
if (!WireFormat::ReadUInt64(*input_, &info.total_rows)) {
return false;
if (!WireFormat::ReadUInt64(*input_, &ret.progress.total_rows)) {
return {};
}
}
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
{
if (!WireFormat::ReadUInt64(*input_, &info.written_rows)) {
return false;
if (!WireFormat::ReadUInt64(*input_, &ret.progress.written_rows)) {
return {};
}
if (!WireFormat::ReadUInt64(*input_, &info.written_bytes)) {
return false;
if (!WireFormat::ReadUInt64(*input_, &ret.progress.written_bytes)) {
return {};
}
}

if (events_) {
events_->OnProgress(info);
events_->OnProgress(ret.progress);
}

return true;
return ret;
}

case ServerCodes::Pong: {
return true;
return PongTag{};
}

case ServerCodes::Hello: {
return true;
return HelloTag{};
}

case ServerCodes::EndOfStream: {
if (events_) {
events_->OnFinish();
}
return false;
return EndOfStreamTag{};
}

case ServerCodes::Log: {
// log tag
if (!WireFormat::SkipString(*input_)) {
return false;
return {};
}
Block block;
LogTag ret;

// Use uncompressed stream since log blocks usually contain only one row
if (!ReadBlock(*input_, &block)) {
return false;
if (!ReadBlock(*input_, &ret.block)) {
return {};
}

if (events_) {
events_->OnServerLog(block);
events_->OnServerLog(ret.block);
}
return true;
return ret;
}

case ServerCodes::TableColumns: {
// external table name
if (!WireFormat::SkipString(*input_)) {
return false;
return {};
}

// columns metadata
if (!WireFormat::SkipString(*input_)) {
return false;
return {};
}
return true;
return TableColumnsTag{};
}

case ServerCodes::ProfileEvents: {
if (!WireFormat::SkipString(*input_)) {
return false;
return {};
}

Block block;
if (!ReadBlock(*input_, &block)) {
return false;
ProfileEventsTag ret;
if (!ReadBlock(*input_, &ret.block)) {
return {};
}

if (events_) {
events_->OnProfileEvents(block);
events_->OnProfileEvents(ret.block);
}
return true;
return ret;
}

default:
throw UnimplementedError("unimplemented " + std::to_string((int)packet_type));
break;
}
}

bool Client::Impl::ProcessPacket(uint64_t* server_packet) {
auto packet = ReceivePacket(server_packet);
return std::visit(overloaded{
[](std::monostate) { return false; },
[](DataTag) { return true; },
[](ExceptionTag) {return false; },
[](ProfileTag){ return true; },
[](ProgressTag){ return true; },
[](PongTag){ return true; },
[](HelloTag){ return true; },
[](LogTag){ return true; },
[](TableColumnsTag){ return true; },
[](ProfileEventsTag){ return true; },
[](EndOfStreamTag){ return false; },
}, packet);
}

bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
// Additional information about block.
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
Expand Down Expand Up @@ -793,8 +859,7 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
return true;
}

bool Client::Impl::ReceiveData() {
Block block;
bool Client::Impl::ReceiveData(Block * block) {

if (server_info_.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
if (!WireFormat::SkipString(*input_)) {
Expand All @@ -804,18 +869,18 @@ bool Client::Impl::ReceiveData() {

if (compression_ == CompressionState::Enable) {
CompressedInput compressed(input_.get());
if (!ReadBlock(compressed, &block)) {
if (!ReadBlock(compressed, block)) {
return false;
}
} else {
if (!ReadBlock(*input_, &block)) {
if (!ReadBlock(*input_, block)) {
return false;
}
}

if (events_) {
events_->OnData(block);
if (!events_->OnDataCancelable(block)) {
events_->OnData(*block);
if (!events_->OnDataCancelable(*block)) {
SendCancel();
}
}
Expand Down
Loading