#pragma once #include "envoy/access_log/access_log.h" #include "envoy/api/api.h" #include "envoy/common/random_generator.h" #include "envoy/config/core/v3/health_check.pb.h" #include "envoy/data/core/v3/health_check_event.pb.h" #include "envoy/grpc/status.h" #include "envoy/type/v3/http.pb.h" #include "envoy/type/v3/range.pb.h" #include "common/common/logger.h" #include "common/grpc/codec.h" #include "common/http/codec_client.h" #include "common/router/header_parser.h" #include "common/stream_info/stream_info_impl.h" #include "common/upstream/health_checker_base_impl.h" #include "src/proto/grpc/health/v1/health.pb.h" namespace Envoy { namespace Upstream { /** * Factory for creating health checker implementations. */ class HealthCheckerFactory : public Logger::Loggable { public: /** * Create a health checker. * @param health_check_config supplies the health check proto. * @param cluster supplies the owning cluster. * @param runtime supplies the runtime loader. * @param dispatcher supplies the dispatcher. * @param log_manager supplies the log_manager. * @param validation_visitor message validation visitor instance. * @param api reference to the Api object * @return a health checker. */ static HealthCheckerSharedPtr create(const envoy::config::core::v3::HealthCheck& health_check_config, Upstream::Cluster& cluster, Runtime::Loader& runtime, Event::Dispatcher& dispatcher, AccessLog::AccessLogManager& log_manager, ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api); }; /** * HTTP health checker implementation. Connection keep alive is used where possible. */ class HttpHealthCheckerImpl : public HealthCheckerImplBase { public: HttpHealthCheckerImpl(const Cluster& cluster, const envoy::config::core::v3::HealthCheck& config, Event::Dispatcher& dispatcher, Runtime::Loader& runtime, Random::RandomGenerator& random, HealthCheckEventLoggerPtr&& event_logger); /** * Utility class checking if given http status matches configured expectations. */ class HttpStatusChecker { public: HttpStatusChecker( const Protobuf::RepeatedPtrField& expected_statuses, uint64_t default_expected_status); bool inRange(uint64_t http_status) const; private: std::vector> ranges_; }; private: struct HttpActiveHealthCheckSession : public ActiveHealthCheckSession, public Http::ResponseDecoder, public Http::StreamCallbacks { HttpActiveHealthCheckSession(HttpHealthCheckerImpl& parent, const HostSharedPtr& host); ~HttpActiveHealthCheckSession() override; void onResponseComplete(); enum class HealthCheckResult { Succeeded, Degraded, Failed }; HealthCheckResult healthCheckResult(); bool shouldClose() const; // ActiveHealthCheckSession void onInterval() override; void onTimeout() override; void onDeferredDelete() final; // Http::StreamDecoder void decodeData(Buffer::Instance&, bool end_stream) override { if (end_stream) { onResponseComplete(); } } void decodeMetadata(Http::MetadataMapPtr&&) override {} // Http::ResponseDecoder void decode100ContinueHeaders(Http::ResponseHeaderMapPtr&&) override {} void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override; void decodeTrailers(Http::ResponseTrailerMapPtr&&) override { onResponseComplete(); } // Http::StreamCallbacks void onResetStream(Http::StreamResetReason reason, absl::string_view transport_failure_reason) override; void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} void onEvent(Network::ConnectionEvent event); class ConnectionCallbackImpl : public Network::ConnectionCallbacks { public: ConnectionCallbackImpl(HttpActiveHealthCheckSession& parent) : parent_(parent) {} // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override { parent_.onEvent(event); } void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} private: HttpActiveHealthCheckSession& parent_; }; ConnectionCallbackImpl connection_callback_impl_{*this}; HttpHealthCheckerImpl& parent_; Http::CodecClientPtr client_; Http::ResponseHeaderMapPtr response_headers_; const std::string& hostname_; const Http::Protocol protocol_; Network::Address::InstanceConstSharedPtr local_address_; bool expect_reset_{}; }; using HttpActiveHealthCheckSessionPtr = std::unique_ptr; virtual Http::CodecClient* createCodecClient(Upstream::Host::CreateConnectionData& data) PURE; // HealthCheckerImplBase ActiveHealthCheckSessionPtr makeSession(HostSharedPtr host) override { return std::make_unique(*this, host); } envoy::data::core::v3::HealthCheckerType healthCheckerType() const override { return envoy::data::core::v3::HTTP; } Http::CodecClient::Type codecClientType(const envoy::type::v3::CodecClientType& type); const std::string path_; const std::string host_value_; absl::optional service_name_matcher_; Router::HeaderParserPtr request_headers_parser_; const HttpStatusChecker http_status_checker_; protected: const Http::CodecClient::Type codec_client_type_; Random::RandomGenerator& random_generator_; }; /** * Production implementation of the HTTP health checker that allocates a real codec client. */ class ProdHttpHealthCheckerImpl : public HttpHealthCheckerImpl { public: using HttpHealthCheckerImpl::HttpHealthCheckerImpl; // HttpHealthCheckerImpl Http::CodecClient* createCodecClient(Upstream::Host::CreateConnectionData& data) override; }; /** * Utility class for loading a binary health checking config and matching it against a buffer. * Split out for ease of testing. The type of matching performed is the following (this is the * MongoDB health check request and response): * * "send": [ {"binary": "39000000"}, {"binary": "EEEEEEEE"}, {"binary": "00000000"}, {"binary": "d4070000"}, {"binary": "00000000"}, {"binary": "746573742e"}, {"binary": "24636d6400"}, {"binary": "00000000"}, {"binary": "FFFFFFFF"}, {"binary": "13000000"}, {"binary": "01"}, {"binary": "70696e6700"}, {"binary": "000000000000f03f"}, {"binary": "00"} ], "receive": [ {"binary": "EEEEEEEE"}, {"binary": "01000000"}, {"binary": "00000000"}, {"binary": "0000000000000000"}, {"binary": "00000000"}, {"binary": "11000000"}, {"binary": "01"}, {"binary": "6f6b"}, {"binary": "00000000000000f03f"}, {"binary": "00"} ] * * During each health check cycle, all of the "send" bytes are sent to the target server. Each * binary block can be of arbitrary length and is just concatenated together when sent. * * On the receive side, "fuzzy" matching is performed such that each binary block must be found, * and in the order specified, but not necessary contiguous. Thus, in the example above, * "FFFFFFFF" could be inserted in the response between "EEEEEEEE" and "01000000" and the check * would still pass. */ class TcpHealthCheckMatcher { public: using MatchSegments = std::list>; static MatchSegments loadProtoBytes( const Protobuf::RepeatedPtrField& byte_array); static bool match(const MatchSegments& expected, const Buffer::Instance& buffer); }; /** * TCP health checker implementation. */ class TcpHealthCheckerImpl : public HealthCheckerImplBase { public: TcpHealthCheckerImpl(const Cluster& cluster, const envoy::config::core::v3::HealthCheck& config, Event::Dispatcher& dispatcher, Runtime::Loader& runtime, Random::RandomGenerator& random, HealthCheckEventLoggerPtr&& event_logger); private: struct TcpActiveHealthCheckSession; struct TcpSessionCallbacks : public Network::ConnectionCallbacks, public Network::ReadFilterBaseImpl { TcpSessionCallbacks(TcpActiveHealthCheckSession& parent) : parent_(parent) {} // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override { parent_.onEvent(event); } void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} // Network::ReadFilter Network::FilterStatus onData(Buffer::Instance& data, bool) override { parent_.onData(data); return Network::FilterStatus::StopIteration; } TcpActiveHealthCheckSession& parent_; }; struct TcpActiveHealthCheckSession : public ActiveHealthCheckSession { TcpActiveHealthCheckSession(TcpHealthCheckerImpl& parent, const HostSharedPtr& host) : ActiveHealthCheckSession(parent, host), parent_(parent) {} ~TcpActiveHealthCheckSession() override; void onData(Buffer::Instance& data); void onEvent(Network::ConnectionEvent event); // ActiveHealthCheckSession void onInterval() override; void onTimeout() override; void onDeferredDelete() final; TcpHealthCheckerImpl& parent_; Network::ClientConnectionPtr client_; std::shared_ptr session_callbacks_; // If true, stream close was initiated by us, not e.g. remote close or TCP reset. // In this case healthcheck status already reported, only state cleanup required. bool expect_close_{}; }; using TcpActiveHealthCheckSessionPtr = std::unique_ptr; // HealthCheckerImplBase ActiveHealthCheckSessionPtr makeSession(HostSharedPtr host) override { return std::make_unique(*this, host); } envoy::data::core::v3::HealthCheckerType healthCheckerType() const override { return envoy::data::core::v3::TCP; } const TcpHealthCheckMatcher::MatchSegments send_bytes_; const TcpHealthCheckMatcher::MatchSegments receive_bytes_; }; /** * gRPC health checker implementation. */ class GrpcHealthCheckerImpl : public HealthCheckerImplBase { public: GrpcHealthCheckerImpl(const Cluster& cluster, const envoy::config::core::v3::HealthCheck& config, Event::Dispatcher& dispatcher, Runtime::Loader& runtime, Random::RandomGenerator& random, HealthCheckEventLoggerPtr&& event_logger); private: struct GrpcActiveHealthCheckSession : public ActiveHealthCheckSession, public Http::ResponseDecoder, public Http::StreamCallbacks { GrpcActiveHealthCheckSession(GrpcHealthCheckerImpl& parent, const HostSharedPtr& host); ~GrpcActiveHealthCheckSession() override; void onRpcComplete(Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message, bool end_stream); bool isHealthCheckSucceeded(Grpc::Status::GrpcStatus grpc_status) const; void resetState(); void logHealthCheckStatus(Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message); // ActiveHealthCheckSession void onInterval() override; void onTimeout() override; void onDeferredDelete() final; // Http::StreamDecoder void decodeData(Buffer::Instance&, bool end_stream) override; void decodeMetadata(Http::MetadataMapPtr&&) override {} // Http::ResponseDecoder void decode100ContinueHeaders(Http::ResponseHeaderMapPtr&&) override {} void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override; void decodeTrailers(Http::ResponseTrailerMapPtr&&) override; // Http::StreamCallbacks void onResetStream(Http::StreamResetReason reason, absl::string_view transport_failure_reason) override; void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} void onEvent(Network::ConnectionEvent event); void onGoAway(Http::GoAwayErrorCode error_code); class ConnectionCallbackImpl : public Network::ConnectionCallbacks { public: ConnectionCallbackImpl(GrpcActiveHealthCheckSession& parent) : parent_(parent) {} // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override { parent_.onEvent(event); } void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} private: GrpcActiveHealthCheckSession& parent_; }; class HttpConnectionCallbackImpl : public Http::ConnectionCallbacks { public: HttpConnectionCallbackImpl(GrpcActiveHealthCheckSession& parent) : parent_(parent) {} // Http::ConnectionCallbacks void onGoAway(Http::GoAwayErrorCode error_code) override { parent_.onGoAway(error_code); } private: GrpcActiveHealthCheckSession& parent_; }; ConnectionCallbackImpl connection_callback_impl_{*this}; HttpConnectionCallbackImpl http_connection_callback_impl_{*this}; GrpcHealthCheckerImpl& parent_; Http::CodecClientPtr client_; Http::RequestEncoder* request_encoder_; Grpc::Decoder decoder_; std::unique_ptr health_check_response_; // If true, stream reset was initiated by us (GrpcActiveHealthCheckSession), not by HTTP stack, // e.g. remote reset. In this case healthcheck status has already been reported, only state // cleanup is required. bool expect_reset_ = false; // If true, we received a GOAWAY (NO_ERROR code) and are deferring closing the connection // until the active probe completes. bool received_no_error_goaway_ = false; }; virtual Http::CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) PURE; // HealthCheckerImplBase ActiveHealthCheckSessionPtr makeSession(HostSharedPtr host) override { return std::make_unique(*this, host); } envoy::data::core::v3::HealthCheckerType healthCheckerType() const override { return envoy::data::core::v3::GRPC; } protected: Random::RandomGenerator& random_generator_; private: const Protobuf::MethodDescriptor& service_method_; absl::optional service_name_; absl::optional authority_value_; }; /** * Production implementation of the gRPC health checker that allocates a real codec client. */ class ProdGrpcHealthCheckerImpl : public GrpcHealthCheckerImpl { public: using GrpcHealthCheckerImpl::GrpcHealthCheckerImpl; // GrpcHealthCheckerImpl Http::CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) override; }; } // namespace Upstream } // namespace Envoy