#include "envoy/config/cluster/v3/cluster.pb.h" #include "envoy/grpc/status.h" #include "envoy/stats/scope.h" #include "common/config/protobuf_link_hacks.h" #include "common/protobuf/protobuf.h" #include "common/protobuf/utility.h" #include "test/common/grpc/grpc_client_integration.h" #include "test/integration/http_integration.h" #include "test/integration/utility.h" #include "test/test_common/network_utility.h" #include "test/test_common/resources.h" #include "test/test_common/simulated_time_system.h" #include "test/test_common/utility.h" #include "absl/synchronization/notification.h" #include "gtest/gtest.h" using testing::AssertionResult; namespace Envoy { namespace { const char FirstClusterName[] = "cluster_1"; const char SecondClusterName[] = "cluster_2"; // Index in fake_upstreams_ const int FirstUpstreamIndex = 2; const int SecondUpstreamIndex = 3; const std::string& config() { CONSTRUCT_ON_FIRST_USE(std::string, fmt::format(R"EOF( admin: access_log_path: {} address: socket_address: address: 127.0.0.1 port_value: 0 dynamic_resources: cds_config: resource_api_version: V3 api_config_source: api_type: GRPC grpc_services: envoy_grpc: cluster_name: my_cds_cluster set_node_on_first_message_only: false static_resources: clusters: - name: my_cds_cluster typed_extension_protocol_options: envoy.extensions.upstreams.http.v3.HttpProtocolOptions: "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions explicit_http_config: http2_protocol_options: {{}} load_assignment: cluster_name: my_cds_cluster endpoints: - lb_endpoints: - endpoint: address: socket_address: address: 127.0.0.1 port_value: 0 - name: aggregate_cluster connect_timeout: 0.25s lb_policy: CLUSTER_PROVIDED cluster_type: name: envoy.clusters.aggregate typed_config: "@type": type.googleapis.com/envoy.extensions.clusters.aggregate.v3.ClusterConfig clusters: - cluster_1 - cluster_2 listeners: - name: http address: socket_address: address: 127.0.0.1 port_value: 0 filter_chains: filters: name: http typed_config: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager stat_prefix: config_test http_filters: name: envoy.filters.http.router codec_type: HTTP1 route_config: name: route_config_0 validate_clusters: false virtual_hosts: name: integration routes: - route: cluster: cluster_1 match: prefix: "/cluster1" - route: cluster: cluster_2 match: prefix: "/cluster2" - route: cluster: aggregate_cluster retry_policy: retry_priority: name: envoy.retry_priorities.previous_priorities typed_config: "@type": type.googleapis.com/envoy.config.retry.previous_priorities.PreviousPrioritiesConfig update_frequency: 1 match: prefix: "/aggregatecluster" domains: "*" )EOF", Platform::null_device_path)); } class AggregateIntegrationTest : public testing::TestWithParam, public HttpIntegrationTest { public: AggregateIntegrationTest() : HttpIntegrationTest(Http::CodecClient::Type::HTTP1, GetParam(), config()) { use_lds_ = false; } void TearDown() override { cleanUpXdsConnection(); } void initialize() override { use_lds_ = false; setUpstreamCount(2); // the CDS cluster setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); // CDS uses gRPC uses HTTP2. defer_listener_finalization_ = true; HttpIntegrationTest::initialize(); addFakeUpstream(FakeHttpConnection::Type::HTTP2); addFakeUpstream(FakeHttpConnection::Type::HTTP2); cluster1_ = ConfigHelper::buildStaticCluster( FirstClusterName, fake_upstreams_[FirstUpstreamIndex]->localAddress()->ip()->port(), Network::Test::getLoopbackAddressString(GetParam())); cluster2_ = ConfigHelper::buildStaticCluster( SecondClusterName, fake_upstreams_[SecondUpstreamIndex]->localAddress()->ip()->port(), Network::Test::getLoopbackAddressString(GetParam())); // Let Envoy establish its connection to the CDS server. acceptXdsConnection(); // Do the initial compareDiscoveryRequest / sendDiscoveryResponse for cluster_1. EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true)); sendDiscoveryResponse(Config::TypeUrl::get().Cluster, {cluster1_}, {cluster1_}, {}, "55"); test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3); // Wait for our statically specified listener to become ready, and register its port in the // test framework's downstream listener port map. test_server_->waitUntilListenersReady(); registerTestServerPorts({"http"}); } void acceptXdsConnection() { AssertionResult result = // xds_connection_ is filled with the new FakeHttpConnection. fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, xds_connection_); RELEASE_ASSERT(result, result.message()); result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); RELEASE_ASSERT(result, result.message()); xds_stream_->startGrpcStream(); } envoy::config::cluster::v3::Cluster cluster1_; envoy::config::cluster::v3::Cluster cluster2_; }; INSTANTIATE_TEST_SUITE_P(IpVersions, AggregateIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); TEST_P(AggregateIntegrationTest, ClusterUpDownUp) { // Calls our initialize(), which includes establishing a listener, route, and cluster. testRouterHeaderOnlyRequestAndResponse(nullptr, FirstUpstreamIndex, "/aggregatecluster"); // Tell Envoy that cluster_1 is gone. EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); sendDiscoveryResponse(Config::TypeUrl::get().Cluster, {}, {}, {FirstClusterName}, "42"); // We can continue the test once we're sure that Envoy's ClusterManager has made use of // the DiscoveryResponse that says cluster_1 is gone. test_server_->waitForCounterGe("cluster_manager.cluster_removed", 1); // Now that cluster_1 is gone, the listener (with its routing to cluster_1) should 503. BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest(lookupPort("http"), "GET", "/aggregatecluster", "", downstream_protocol_, version_, "foo.com"); ASSERT_TRUE(response->complete()); EXPECT_EQ("503", response->headers().getStatusValue()); cleanupUpstreamAndDownstream(); ASSERT_TRUE(codec_client_->waitForDisconnect()); // Tell Envoy that cluster_1 is back. EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "42", {}, {}, {})); sendDiscoveryResponse(Config::TypeUrl::get().Cluster, {cluster1_}, {cluster1_}, {}, "413"); test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3); testRouterHeaderOnlyRequestAndResponse(nullptr, FirstUpstreamIndex, "/aggregatecluster"); cleanupUpstreamAndDownstream(); } // Tests adding a cluster, adding another, then removing the first. TEST_P(AggregateIntegrationTest, TwoClusters) { // Calls our initialize(), which includes establishing a listener, route, and cluster. testRouterHeaderOnlyRequestAndResponse(nullptr, FirstUpstreamIndex, "/aggregatecluster"); cleanupUpstreamAndDownstream(); ASSERT_TRUE(codec_client_->waitForDisconnect()); // Tell Envoy that cluster_2 is here. EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); sendDiscoveryResponse( Config::TypeUrl::get().Cluster, {cluster1_, cluster2_}, {cluster2_}, {}, "42"); // The '4' includes the fake CDS server and aggregate cluster. test_server_->waitForGaugeGe("cluster_manager.active_clusters", 4); // A request for aggregate cluster should be fine. testRouterHeaderOnlyRequestAndResponse(nullptr, FirstUpstreamIndex, "/aggregatecluster"); cleanupUpstreamAndDownstream(); ASSERT_TRUE(codec_client_->waitForDisconnect()); // Tell Envoy that cluster_1 is gone. EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "42", {}, {}, {})); sendDiscoveryResponse( Config::TypeUrl::get().Cluster, {cluster2_}, {}, {FirstClusterName}, "43"); // We can continue the test once we're sure that Envoy's ClusterManager has made use of // the DiscoveryResponse that says cluster_1 is gone. test_server_->waitForCounterGe("cluster_manager.cluster_removed", 1); testRouterHeaderOnlyRequestAndResponse(nullptr, SecondUpstreamIndex, "/aggregatecluster"); cleanupUpstreamAndDownstream(); ASSERT_TRUE(codec_client_->waitForDisconnect()); // Tell Envoy that cluster_1 is back. EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "43", {}, {}, {})); sendDiscoveryResponse( Config::TypeUrl::get().Cluster, {cluster1_, cluster2_}, {cluster1_}, {}, "413"); test_server_->waitForGaugeGe("cluster_manager.active_clusters", 4); testRouterHeaderOnlyRequestAndResponse(nullptr, FirstUpstreamIndex, "/aggregatecluster"); cleanupUpstreamAndDownstream(); } // Test that the PreviousPriorities retry predicate works as expected. It is configured // in this test to exclude a priority after a single failure, so the first failure // on cluster_1 results in the retry going to cluster_2. TEST_P(AggregateIntegrationTest, PreviousPrioritiesRetryPredicate) { initialize(); // Tell Envoy that cluster_2 is here. sendDiscoveryResponse( Config::TypeUrl::get().Cluster, {cluster1_, cluster2_}, {cluster2_}, {}, "42"); // The '4' includes the fake CDS server and aggregate cluster. test_server_->waitForGaugeGe("cluster_manager.active_clusters", 4); codec_client_ = makeHttpConnection(lookupPort("http")); auto response = codec_client_->makeRequestWithBody( Http::TestRequestHeaderMapImpl{{":method", "GET"}, {":path", "/aggregatecluster"}, {":scheme", "http"}, {":authority", "host"}, {"x-forwarded-for", "10.0.0.1"}, {"x-envoy-retry-on", "5xx"}}, 1024); waitForNextUpstreamRequest(FirstUpstreamIndex); upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, false); ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); ASSERT_TRUE(fake_upstream_connection_->close()); ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); fake_upstream_connection_.reset(); waitForNextUpstreamRequest(SecondUpstreamIndex); upstream_request_->encodeHeaders(default_response_headers_, true); response->waitForEndStream(); EXPECT_TRUE(upstream_request_->complete()); EXPECT_TRUE(response->complete()); EXPECT_EQ("200", response->headers().getStatusValue()); cleanupUpstreamAndDownstream(); } } // namespace } // namespace Envoy