mirror of
https://github.com/php-win-ext/grpc.git
synced 2026-03-25 17:42:16 +01:00
Introduce grpc_byte_buffer_reader_peek and use it for Protobuf parsing.
grpc_byte_buffer_reader_next() copies and references the slice. This
is not always necessary since the caller will not use the slice
after destroying the byte buffer.
A prominent example is the protobuf parser, which
calls grpc_byte_buffer_reader_next() and immediately unrefs the slice
after the call. This ref() and unref() calls can be very expensive
in the hot path.
This commit introduces grpc_byte_buffer_reader_peek() which
essentialy return a pointer to the slice in the buffer, i.e.,
no copies, and no refs.
QPS of 1MiB 1 Channel callback benchmark increases by 5%.
More importantly insructions per cycle is increased by 10%.
Also add tests and benchmarks for byte_buffer_reader_peek()
This commit reaplies 509e77a5a3
This commit is contained in:
1
grpc.def
1
grpc.def
@@ -149,6 +149,7 @@ EXPORTS
|
||||
grpc_byte_buffer_reader_init
|
||||
grpc_byte_buffer_reader_destroy
|
||||
grpc_byte_buffer_reader_next
|
||||
grpc_byte_buffer_reader_peek
|
||||
grpc_byte_buffer_reader_readall
|
||||
grpc_raw_byte_buffer_from_reader
|
||||
gpr_log_severity_string
|
||||
|
||||
@@ -73,6 +73,19 @@ GRPCAPI void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader);
|
||||
GRPCAPI int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
|
||||
grpc_slice* slice);
|
||||
|
||||
/** EXPERIMENTAL API - This function may be removed and changed, in the future.
|
||||
*
|
||||
* Updates \a slice with the next piece of data from from \a reader and returns
|
||||
* 1. Returns 0 at the end of the stream. Caller is responsible for making sure
|
||||
* the slice pointer remains valid when accessed.
|
||||
*
|
||||
* NOTE: Do not use this function unless the caller can guarantee that the
|
||||
* underlying grpc_byte_buffer outlasts the use of the slice. This is only
|
||||
* safe when the underlying grpc_byte_buffer remains immutable while slice
|
||||
* is being accessed. */
|
||||
GRPCAPI int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
|
||||
grpc_slice** slice);
|
||||
|
||||
/** Merge all data from \a reader into single slice */
|
||||
GRPCAPI grpc_slice
|
||||
grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader* reader);
|
||||
|
||||
@@ -85,6 +85,8 @@ class CoreCodegen final : public CoreCodegenInterface {
|
||||
grpc_byte_buffer_reader* reader) override;
|
||||
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
|
||||
grpc_slice* slice) override;
|
||||
int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
|
||||
grpc_slice** slice) override;
|
||||
|
||||
grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
|
||||
size_t nslices) override;
|
||||
|
||||
@@ -92,6 +92,8 @@ class CoreCodegenInterface {
|
||||
grpc_byte_buffer_reader* reader) = 0;
|
||||
virtual int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
|
||||
grpc_slice* slice) = 0;
|
||||
virtual int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
|
||||
grpc_slice** slice) = 0;
|
||||
|
||||
virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
|
||||
size_t nslices) = 0;
|
||||
|
||||
@@ -73,7 +73,7 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
|
||||
}
|
||||
/// If we have backed up previously, we need to return the backed-up slice
|
||||
if (backup_count_ > 0) {
|
||||
*data = GRPC_SLICE_START_PTR(slice_) + GRPC_SLICE_LENGTH(slice_) -
|
||||
*data = GRPC_SLICE_START_PTR(*slice_) + GRPC_SLICE_LENGTH(*slice_) -
|
||||
backup_count_;
|
||||
GPR_CODEGEN_ASSERT(backup_count_ <= INT_MAX);
|
||||
*size = (int)backup_count_;
|
||||
@@ -81,15 +81,14 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
|
||||
return true;
|
||||
}
|
||||
/// Otherwise get the next slice from the byte buffer reader
|
||||
if (!g_core_codegen_interface->grpc_byte_buffer_reader_next(&reader_,
|
||||
if (!g_core_codegen_interface->grpc_byte_buffer_reader_peek(&reader_,
|
||||
&slice_)) {
|
||||
return false;
|
||||
}
|
||||
g_core_codegen_interface->grpc_slice_unref(slice_);
|
||||
*data = GRPC_SLICE_START_PTR(slice_);
|
||||
*data = GRPC_SLICE_START_PTR(*slice_);
|
||||
// On win x64, int is only 32bit
|
||||
GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX);
|
||||
byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_);
|
||||
GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(*slice_) <= INT_MAX);
|
||||
byte_count_ += * size = (int)GRPC_SLICE_LENGTH(*slice_);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -100,7 +99,7 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
|
||||
/// bytes that have already been returned by the last call of Next.
|
||||
/// So do the backup and have that ready for a later Next.
|
||||
void BackUp(int count) override {
|
||||
GPR_CODEGEN_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_)));
|
||||
GPR_CODEGEN_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(*slice_)));
|
||||
backup_count_ = count;
|
||||
}
|
||||
|
||||
@@ -135,14 +134,15 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
|
||||
int64_t backup_count() { return backup_count_; }
|
||||
void set_backup_count(int64_t backup_count) { backup_count_ = backup_count; }
|
||||
grpc_byte_buffer_reader* reader() { return &reader_; }
|
||||
grpc_slice* slice() { return &slice_; }
|
||||
grpc_slice* slice() { return slice_; }
|
||||
grpc_slice** mutable_slice_ptr() { return &slice_; }
|
||||
|
||||
private:
|
||||
int64_t byte_count_; ///< total bytes read since object creation
|
||||
int64_t backup_count_; ///< how far backed up in the stream we are
|
||||
grpc_byte_buffer_reader reader_; ///< internal object to read \a grpc_slice
|
||||
///< from the \a grpc_byte_buffer
|
||||
grpc_slice slice_; ///< current slice passed back to the caller
|
||||
grpc_slice* slice_; ///< current slice passed back to the caller
|
||||
Status status_; ///< status of the entire object
|
||||
};
|
||||
|
||||
|
||||
@@ -91,6 +91,23 @@ void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader) {
|
||||
}
|
||||
}
|
||||
|
||||
int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
|
||||
grpc_slice** slice) {
|
||||
switch (reader->buffer_in->type) {
|
||||
case GRPC_BB_RAW: {
|
||||
grpc_slice_buffer* slice_buffer;
|
||||
slice_buffer = &reader->buffer_out->data.raw.slice_buffer;
|
||||
if (reader->current.index < slice_buffer->count) {
|
||||
*slice = &slice_buffer->slices[reader->current.index];
|
||||
reader->current.index += 1;
|
||||
return 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
|
||||
grpc_slice* slice) {
|
||||
switch (reader->buffer_in->type) {
|
||||
|
||||
@@ -139,6 +139,11 @@ int CoreCodegen::grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
|
||||
return ::grpc_byte_buffer_reader_next(reader, slice);
|
||||
}
|
||||
|
||||
int CoreCodegen::grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
|
||||
grpc_slice** slice) {
|
||||
return ::grpc_byte_buffer_reader_peek(reader, slice);
|
||||
}
|
||||
|
||||
grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(grpc_slice* slice,
|
||||
size_t nslices) {
|
||||
return ::grpc_raw_byte_buffer_create(slice, nslices);
|
||||
|
||||
@@ -172,6 +172,7 @@ grpc_byte_buffer_destroy_type grpc_byte_buffer_destroy_import;
|
||||
grpc_byte_buffer_reader_init_type grpc_byte_buffer_reader_init_import;
|
||||
grpc_byte_buffer_reader_destroy_type grpc_byte_buffer_reader_destroy_import;
|
||||
grpc_byte_buffer_reader_next_type grpc_byte_buffer_reader_next_import;
|
||||
grpc_byte_buffer_reader_peek_type grpc_byte_buffer_reader_peek_import;
|
||||
grpc_byte_buffer_reader_readall_type grpc_byte_buffer_reader_readall_import;
|
||||
grpc_raw_byte_buffer_from_reader_type grpc_raw_byte_buffer_from_reader_import;
|
||||
gpr_log_severity_string_type gpr_log_severity_string_import;
|
||||
@@ -440,6 +441,7 @@ void grpc_rb_load_imports(HMODULE library) {
|
||||
grpc_byte_buffer_reader_init_import = (grpc_byte_buffer_reader_init_type) GetProcAddress(library, "grpc_byte_buffer_reader_init");
|
||||
grpc_byte_buffer_reader_destroy_import = (grpc_byte_buffer_reader_destroy_type) GetProcAddress(library, "grpc_byte_buffer_reader_destroy");
|
||||
grpc_byte_buffer_reader_next_import = (grpc_byte_buffer_reader_next_type) GetProcAddress(library, "grpc_byte_buffer_reader_next");
|
||||
grpc_byte_buffer_reader_peek_import = (grpc_byte_buffer_reader_peek_type) GetProcAddress(library, "grpc_byte_buffer_reader_peek");
|
||||
grpc_byte_buffer_reader_readall_import = (grpc_byte_buffer_reader_readall_type) GetProcAddress(library, "grpc_byte_buffer_reader_readall");
|
||||
grpc_raw_byte_buffer_from_reader_import = (grpc_raw_byte_buffer_from_reader_type) GetProcAddress(library, "grpc_raw_byte_buffer_from_reader");
|
||||
gpr_log_severity_string_import = (gpr_log_severity_string_type) GetProcAddress(library, "gpr_log_severity_string");
|
||||
|
||||
@@ -491,6 +491,9 @@ extern grpc_byte_buffer_reader_destroy_type grpc_byte_buffer_reader_destroy_impo
|
||||
typedef int(*grpc_byte_buffer_reader_next_type)(grpc_byte_buffer_reader* reader, grpc_slice* slice);
|
||||
extern grpc_byte_buffer_reader_next_type grpc_byte_buffer_reader_next_import;
|
||||
#define grpc_byte_buffer_reader_next grpc_byte_buffer_reader_next_import
|
||||
typedef int(*grpc_byte_buffer_reader_peek_type)(grpc_byte_buffer_reader* reader, grpc_slice** slice);
|
||||
extern grpc_byte_buffer_reader_peek_type grpc_byte_buffer_reader_peek_import;
|
||||
#define grpc_byte_buffer_reader_peek grpc_byte_buffer_reader_peek_import
|
||||
typedef grpc_slice(*grpc_byte_buffer_reader_readall_type)(grpc_byte_buffer_reader* reader);
|
||||
extern grpc_byte_buffer_reader_readall_type grpc_byte_buffer_reader_readall_import;
|
||||
#define grpc_byte_buffer_reader_readall grpc_byte_buffer_reader_readall_import
|
||||
|
||||
@@ -101,6 +101,73 @@ static void test_read_none_compressed_slice(void) {
|
||||
grpc_byte_buffer_destroy(buffer);
|
||||
}
|
||||
|
||||
static void test_peek_one_slice(void) {
|
||||
grpc_slice slice;
|
||||
grpc_byte_buffer* buffer;
|
||||
grpc_byte_buffer_reader reader;
|
||||
grpc_slice* first_slice;
|
||||
grpc_slice* second_slice;
|
||||
int first_code, second_code;
|
||||
|
||||
LOG_TEST("test_peek_one_slice");
|
||||
slice = grpc_slice_from_copied_string("test");
|
||||
buffer = grpc_raw_byte_buffer_create(&slice, 1);
|
||||
grpc_slice_unref(slice);
|
||||
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) &&
|
||||
"Couldn't init byte buffer reader");
|
||||
first_code = grpc_byte_buffer_reader_peek(&reader, &first_slice);
|
||||
GPR_ASSERT(first_code != 0);
|
||||
GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(*first_slice), "test", 4) == 0);
|
||||
second_code = grpc_byte_buffer_reader_peek(&reader, &second_slice);
|
||||
GPR_ASSERT(second_code == 0);
|
||||
grpc_byte_buffer_destroy(buffer);
|
||||
}
|
||||
|
||||
static void test_peek_one_slice_malloc(void) {
|
||||
grpc_slice slice;
|
||||
grpc_byte_buffer* buffer;
|
||||
grpc_byte_buffer_reader reader;
|
||||
grpc_slice* first_slice;
|
||||
grpc_slice* second_slice;
|
||||
int first_code, second_code;
|
||||
|
||||
LOG_TEST("test_peek_one_slice_malloc");
|
||||
slice = grpc_slice_malloc(4);
|
||||
memcpy(GRPC_SLICE_START_PTR(slice), "test", 4);
|
||||
buffer = grpc_raw_byte_buffer_create(&slice, 1);
|
||||
grpc_slice_unref(slice);
|
||||
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) &&
|
||||
"Couldn't init byte buffer reader");
|
||||
first_code = grpc_byte_buffer_reader_peek(&reader, &first_slice);
|
||||
GPR_ASSERT(first_code != 0);
|
||||
GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(*first_slice), "test", 4) == 0);
|
||||
second_code = grpc_byte_buffer_reader_peek(&reader, &second_slice);
|
||||
GPR_ASSERT(second_code == 0);
|
||||
grpc_byte_buffer_destroy(buffer);
|
||||
}
|
||||
|
||||
static void test_peek_none_compressed_slice(void) {
|
||||
grpc_slice slice;
|
||||
grpc_byte_buffer* buffer;
|
||||
grpc_byte_buffer_reader reader;
|
||||
grpc_slice* first_slice;
|
||||
grpc_slice* second_slice;
|
||||
int first_code, second_code;
|
||||
|
||||
LOG_TEST("test_peek_none_compressed_slice");
|
||||
slice = grpc_slice_from_copied_string("test");
|
||||
buffer = grpc_raw_byte_buffer_create(&slice, 1);
|
||||
grpc_slice_unref(slice);
|
||||
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) &&
|
||||
"Couldn't init byte buffer reader");
|
||||
first_code = grpc_byte_buffer_reader_peek(&reader, &first_slice);
|
||||
GPR_ASSERT(first_code != 0);
|
||||
GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(*first_slice), "test", 4) == 0);
|
||||
second_code = grpc_byte_buffer_reader_peek(&reader, &second_slice);
|
||||
GPR_ASSERT(second_code == 0);
|
||||
grpc_byte_buffer_destroy(buffer);
|
||||
}
|
||||
|
||||
static void test_read_corrupted_slice(void) {
|
||||
grpc_slice slice;
|
||||
grpc_byte_buffer* buffer;
|
||||
@@ -271,6 +338,9 @@ int main(int argc, char** argv) {
|
||||
test_read_one_slice();
|
||||
test_read_one_slice_malloc();
|
||||
test_read_none_compressed_slice();
|
||||
test_peek_one_slice();
|
||||
test_peek_one_slice_malloc();
|
||||
test_peek_none_compressed_slice();
|
||||
test_read_gzip_compressed_slice();
|
||||
test_read_deflate_compressed_slice();
|
||||
test_read_corrupted_slice();
|
||||
|
||||
@@ -209,6 +209,7 @@ int main(int argc, char **argv) {
|
||||
printf("%lx", (unsigned long) grpc_byte_buffer_reader_init);
|
||||
printf("%lx", (unsigned long) grpc_byte_buffer_reader_destroy);
|
||||
printf("%lx", (unsigned long) grpc_byte_buffer_reader_next);
|
||||
printf("%lx", (unsigned long) grpc_byte_buffer_reader_peek);
|
||||
printf("%lx", (unsigned long) grpc_byte_buffer_reader_readall);
|
||||
printf("%lx", (unsigned long) grpc_raw_byte_buffer_from_reader);
|
||||
printf("%lx", (unsigned long) gpr_log_severity_string);
|
||||
|
||||
@@ -29,9 +29,8 @@
|
||||
namespace grpc {
|
||||
namespace testing {
|
||||
|
||||
auto& force_library_initialization = Library::get();
|
||||
|
||||
static void BM_ByteBuffer_Copy(benchmark::State& state) {
|
||||
Library::get();
|
||||
int num_slices = state.range(0);
|
||||
size_t slice_size = state.range(1);
|
||||
std::vector<grpc::Slice> slices;
|
||||
@@ -48,6 +47,74 @@ static void BM_ByteBuffer_Copy(benchmark::State& state) {
|
||||
}
|
||||
BENCHMARK(BM_ByteBuffer_Copy)->Ranges({{1, 64}, {1, 1024 * 1024}});
|
||||
|
||||
static void BM_ByteBufferReader_Next(benchmark::State& state) {
|
||||
Library::get();
|
||||
const int num_slices = state.range(0);
|
||||
constexpr size_t kSliceSize = 16;
|
||||
std::vector<grpc_slice> slices;
|
||||
for (int i = 0; i < num_slices; ++i) {
|
||||
std::unique_ptr<char[]> buf(new char[kSliceSize]);
|
||||
slices.emplace_back(g_core_codegen_interface->grpc_slice_from_copied_buffer(
|
||||
buf.get(), kSliceSize));
|
||||
}
|
||||
grpc_byte_buffer* bb = g_core_codegen_interface->grpc_raw_byte_buffer_create(
|
||||
slices.data(), num_slices);
|
||||
grpc_byte_buffer_reader reader;
|
||||
GPR_ASSERT(
|
||||
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
|
||||
while (state.KeepRunning()) {
|
||||
grpc_slice* slice;
|
||||
if (GPR_UNLIKELY(!g_core_codegen_interface->grpc_byte_buffer_reader_peek(
|
||||
&reader, &slice))) {
|
||||
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
|
||||
GPR_ASSERT(
|
||||
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
|
||||
g_core_codegen_interface->grpc_byte_buffer_destroy(bb);
|
||||
for (auto& slice : slices) {
|
||||
g_core_codegen_interface->grpc_slice_unref(slice);
|
||||
}
|
||||
}
|
||||
BENCHMARK(BM_ByteBufferReader_Next)->Ranges({{64 * 1024, 1024 * 1024}});
|
||||
|
||||
static void BM_ByteBufferReader_Peek(benchmark::State& state) {
|
||||
Library::get();
|
||||
const int num_slices = state.range(0);
|
||||
constexpr size_t kSliceSize = 16;
|
||||
std::vector<grpc_slice> slices;
|
||||
for (int i = 0; i < num_slices; ++i) {
|
||||
std::unique_ptr<char[]> buf(new char[kSliceSize]);
|
||||
slices.emplace_back(g_core_codegen_interface->grpc_slice_from_copied_buffer(
|
||||
buf.get(), kSliceSize));
|
||||
}
|
||||
grpc_byte_buffer* bb = g_core_codegen_interface->grpc_raw_byte_buffer_create(
|
||||
slices.data(), num_slices);
|
||||
grpc_byte_buffer_reader reader;
|
||||
GPR_ASSERT(
|
||||
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
|
||||
while (state.KeepRunning()) {
|
||||
grpc_slice* slice;
|
||||
if (GPR_UNLIKELY(!g_core_codegen_interface->grpc_byte_buffer_reader_peek(
|
||||
&reader, &slice))) {
|
||||
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
|
||||
GPR_ASSERT(
|
||||
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
|
||||
g_core_codegen_interface->grpc_byte_buffer_destroy(bb);
|
||||
for (auto& slice : slices) {
|
||||
g_core_codegen_interface->grpc_slice_unref(slice);
|
||||
}
|
||||
}
|
||||
BENCHMARK(BM_ByteBufferReader_Peek)->Ranges({{64 * 1024, 1024 * 1024}});
|
||||
|
||||
} // namespace testing
|
||||
} // namespace grpc
|
||||
|
||||
|
||||
Reference in New Issue
Block a user