[PromiseFilters] Add API changes

PiperOrigin-RevId: 859131244
This commit is contained in:
Akshit Patel
2026-01-21 09:16:49 -08:00
committed by Copybara-Service
parent 61fe9b40a9
commit e22d8dea12
6 changed files with 41 additions and 19 deletions

1
BUILD
View File

@@ -5106,6 +5106,7 @@ grpc_cc_library(
"//src/core:json",
"//src/core:match",
"//src/core:memory_quota",
"//src/core:metadata",
"//src/core:metadata_batch",
"//src/core:metadata_info",
"//src/core:notification",

View File

@@ -44,6 +44,7 @@
#include <variant>
#include <vector>
#include "src/core/call/metadata.h"
#include "src/core/call/metadata_batch.h"
#include "src/core/call/metadata_info.h"
#include "src/core/channelz/property_list.h"
@@ -169,8 +170,9 @@ static void read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
static void continue_read_action_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle error, bool tarpit);
static void close_from_api(
grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_error_handle error,
bool tarpit, grpc_core::ServerMetadataHandle send_trailing_metadata);
// Start new streams that have been created if we can
static void maybe_start_some_streams(grpc_chttp2_transport* t);
@@ -1842,8 +1844,10 @@ static void perform_stream_op_locked(void* stream_op,
}
if (op->cancel_stream) {
grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error,
op_payload->cancel_stream.tarpit);
grpc_chttp2_cancel_stream(
t, s, op_payload->cancel_stream.cancel_error,
op_payload->cancel_stream.tarpit,
std::move(op_payload->cancel_stream.send_trailing_metadata));
}
if (op->send_initial_metadata) {
@@ -2456,12 +2460,15 @@ void MaybeTarpit(grpc_chttp2_transport* t, bool tarpit, F fn) {
} // namespace
} // namespace grpc_core
void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle due_to_error, bool tarpit) {
void grpc_chttp2_cancel_stream(
grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle due_to_error, bool tarpit,
grpc_core::ServerMetadataHandle send_trailing_metadata) {
if (!t->is_client && !s->sent_trailing_metadata &&
grpc_error_has_clear_grpc_status(due_to_error) &&
!(s->read_closed && s->write_closed)) {
close_from_api(t, s, due_to_error, tarpit);
close_from_api(t, s, due_to_error, tarpit,
std::move(send_trailing_metadata));
return;
}
@@ -2650,8 +2657,10 @@ grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(
return rsh;
}
static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle error, bool tarpit) {
static void close_from_api(
grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_error_handle error,
bool tarpit,
GRPC_UNUSED grpc_core::ServerMetadataHandle send_trailing_metadata) {
grpc_status_code grpc_status;
std::string message;
grpc_error_get_status(error, s->deadline, &grpc_status, &message, nullptr,

View File

@@ -34,6 +34,7 @@
#include <utility>
#include <variant>
#include "src/core/call/metadata.h"
#include "src/core/call/metadata_batch.h"
#include "src/core/channelz/channelz.h"
#include "src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h"
@@ -892,8 +893,10 @@ void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t);
void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
grpc_chttp2_stream* s);
void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle due_to_error, bool tarpit);
void grpc_chttp2_cancel_stream(
grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle due_to_error, bool tarpit,
grpc_core::ServerMetadataHandle send_trailing_metadata = nullptr);
void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
grpc_chttp2_stream* s);

View File

@@ -23,6 +23,7 @@
#include <utility>
#include <vector>
#include "src/core/call/metadata.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/error.h"
@@ -2040,6 +2041,7 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
!batch->recv_trailing_metadata);
PollContext poll_ctx(this, &flusher);
Completed(batch->payload->cancel_stream.cancel_error,
std::move(batch->payload->cancel_stream.send_trailing_metadata),
batch->payload->cancel_stream.tarpit, &flusher);
if (is_last()) {
batch.CompleteWith(&flusher);
@@ -2159,8 +2161,9 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
}
// Handle cancellation.
void ServerCallData::Completed(grpc_error_handle error,
bool tarpit_cancellation, Flusher* flusher) {
void ServerCallData::Completed(
grpc_error_handle error, GRPC_UNUSED ServerMetadataHandle trailing_metadata,
bool tarpit_cancellation, Flusher* flusher) {
GRPC_TRACE_VLOG(channel, 2)
<< LogTag() << "ServerCallData::Completed: send_trailing_state="
<< StateString(send_trailing_state_) << " send_initial_state="
@@ -2319,8 +2322,8 @@ void ServerCallData::RecvTrailingMetadataReady(grpc_error_handle error) {
<< " md=" << recv_trailing_metadata_->DebugString();
Flusher flusher(this);
PollContext poll_ctx(this, &flusher);
Completed(error, recv_trailing_metadata_->get(GrpcTarPit()).has_value(),
&flusher);
Completed(error, /*trailing_metadata=*/nullptr,
recv_trailing_metadata_->get(GrpcTarPit()).has_value(), &flusher);
flusher.AddClosure(original_recv_trailing_metadata_ready_, std::move(error),
"continue recv trailing");
}
@@ -2527,8 +2530,8 @@ void ServerCallData::WakeInsideCombiner(Flusher* flusher) {
break;
case SendTrailingState::kInitial: {
GRPC_CHECK(*md->get_pointer(GrpcStatusMetadata()) != GRPC_STATUS_OK);
Completed(StatusFromMetadata(*md), md->get(GrpcTarPit()).has_value(),
flusher);
Completed(StatusFromMetadata(*md), /*trailing_metadata=*/nullptr,
md->get(GrpcTarPit()).has_value(), flusher);
} break;
case SendTrailingState::kCancelled:
// Nothing to do.

View File

@@ -1843,8 +1843,9 @@ class ServerCallData : public BaseCallData {
struct SendInitialMetadata;
// Shut things down when the call completes.
void Completed(grpc_error_handle error, bool tarpit_cancellation,
Flusher* flusher);
void Completed(grpc_error_handle error,
ServerMetadataHandle trailing_metadata,
bool tarpit_cancellation, Flusher* flusher);
// Construct a promise that will "call" the next filter.
// Effectively:
// - put the modified initial metadata into the batch being sent up.

View File

@@ -409,6 +409,11 @@ struct grpc_transport_stream_op_batch_payload {
// This should be set for cancellations that result from malformed client
// initial metadata.
bool tarpit = false;
// Server-side only: If non-null, the transport sends this trailing
// metadata to the client.
// NOTE: This metadata bypasses subsequent filters and is sent directly
// to the client. Ensure it contains only fields intended for the client.
grpc_core::ServerMetadataHandle send_trailing_metadata = nullptr;
} cancel_stream;
};