#pragma once #include "envoy/config/cluster/v3/cluster.pb.h" #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h" #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h" #include "common/upstream/cluster_factory_impl.h" #include "common/upstream/upstream_impl.h" #include "extensions/clusters/aggregate/lb_context.h" namespace Envoy { namespace Extensions { namespace Clusters { namespace Aggregate { using PriorityToClusterVector = std::vector>; // Maps pair(host_cluster_name, host_priority) to the linearized priority of the Aggregate cluster. using ClusterAndPriorityToLinearizedPriorityMap = absl::flat_hash_map, uint32_t>; struct PriorityContext { Upstream::PrioritySetImpl priority_set_; PriorityToClusterVector priority_to_cluster_; ClusterAndPriorityToLinearizedPriorityMap cluster_and_priority_to_linearized_priority_; }; using PriorityContextPtr = std::unique_ptr; class Cluster : public Upstream::ClusterImplBase, Upstream::ClusterUpdateCallbacks { public: Cluster(const envoy::config::cluster::v3::Cluster& cluster, const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config, Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime, Random::RandomGenerator& random, Server::Configuration::TransportSocketFactoryContextImpl& factory_context, Stats::ScopePtr&& stats_scope, ThreadLocal::SlotAllocator& tls, bool added_via_api); // Upstream::Cluster Upstream::Cluster::InitializePhase initializePhase() const override { return Upstream::Cluster::InitializePhase::Secondary; } // Upstream::ClusterUpdateCallbacks void onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) override; void onClusterRemoval(const std::string& cluster_name) override; void refresh() { refresh([](const std::string&) { return false; }); } Upstream::ClusterUpdateCallbacksHandlePtr handle_; Upstream::ClusterManager& cluster_manager_; Runtime::Loader& runtime_; Random::RandomGenerator& random_; ThreadLocal::TypedSlot<> tls_; const std::vector clusters_; private: // Upstream::ClusterImplBase void startPreInit() override; void refresh(const std::function& skip_predicate); PriorityContextPtr linearizePrioritySet(const std::function& skip_predicate); }; // Load balancer used by each worker thread. It will be refreshed when clusters, hosts or priorities // are updated. class AggregateClusterLoadBalancer : public Upstream::LoadBalancer { public: AggregateClusterLoadBalancer( Upstream::ClusterStats& stats, Runtime::Loader& runtime, Random::RandomGenerator& random, const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config) : stats_(stats), runtime_(runtime), random_(random), common_config_(common_config) {} // Upstream::LoadBalancer Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; // Prefetching not yet implemented for extensions. Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { return nullptr; } private: // Use inner class to extend LoadBalancerBase. When initializing AggregateClusterLoadBalancer, the // priority set could be empty, we cannot initialize LoadBalancerBase when priority set is empty. class LoadBalancerImpl : public Upstream::LoadBalancerBase { public: LoadBalancerImpl(const PriorityContext& priority_context, Upstream::ClusterStats& stats, Runtime::Loader& runtime, Random::RandomGenerator& random, const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config) : Upstream::LoadBalancerBase(priority_context.priority_set_, stats, runtime, random, common_config), priority_context_(priority_context) {} // Upstream::LoadBalancer Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; // Prefetching not yet implemented for extensions. Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { return nullptr; } // Upstream::LoadBalancerBase Upstream::HostConstSharedPtr chooseHostOnce(Upstream::LoadBalancerContext*) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } absl::optional hostToLinearizedPriority(const Upstream::HostDescription& host) const; private: const PriorityContext& priority_context_; }; using LoadBalancerImplPtr = std::unique_ptr; LoadBalancerImplPtr load_balancer_; Upstream::ClusterStats& stats_; Runtime::Loader& runtime_; Random::RandomGenerator& random_; const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config_; PriorityContextPtr priority_context_; public: void refresh(PriorityContextPtr priority_context) { if (!priority_context->priority_set_.hostSetsPerPriority().empty()) { load_balancer_ = std::make_unique(*priority_context, stats_, runtime_, random_, common_config_); } else { load_balancer_ = nullptr; } priority_context_ = std::move(priority_context); } }; // Load balancer factory created by the main thread and will be called in each worker thread to // create the thread local load balancer. struct AggregateLoadBalancerFactory : public Upstream::LoadBalancerFactory { AggregateLoadBalancerFactory(const Cluster& cluster) : cluster_(cluster) {} // Upstream::LoadBalancerFactory Upstream::LoadBalancerPtr create() override { return std::make_unique( cluster_.info()->stats(), cluster_.runtime_, cluster_.random_, cluster_.info()->lbConfig()); } const Cluster& cluster_; }; // Thread aware load balancer created by the main thread. struct AggregateThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer { AggregateThreadAwareLoadBalancer(const Cluster& cluster) : cluster_(cluster) {} // Upstream::ThreadAwareLoadBalancer Upstream::LoadBalancerFactorySharedPtr factory() override { return std::make_shared(cluster_); } void initialize() override {} const Cluster& cluster_; }; class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase< envoy::extensions::clusters::aggregate::v3::ClusterConfig> { public: ClusterFactory() : ConfigurableClusterFactoryBase(Extensions::Clusters::ClusterTypes::get().Aggregate) {} private: std::pair createClusterWithConfig( const envoy::config::cluster::v3::Cluster& cluster, const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config, Upstream::ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContextImpl& socket_factory_context, Stats::ScopePtr&& stats_scope) override; }; DECLARE_FACTORY(ClusterFactory); } // namespace Aggregate } // namespace Clusters } // namespace Extensions } // namespace Envoy