/* Copyright (c) 2013, 2024, Oracle and/or its affiliates. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is designed to work with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have either included with the program or referenced in the documentation. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef MTA_SUBMODE_H #define MTA_SUBMODE_H #include #include #include #include #include #include "log_event.h" #include "my_inttypes.h" #include "my_thread_local.h" // my_thread_id #include "mysql/binlog/event/binlog_event.h" // SEQ_UNINIT #include "prealloced_array.h" // Prealloced_array class Log_event; class Query_log_event; class Relay_log_info; class Slave_worker; class THD; struct TABLE; typedef Prealloced_array Slave_worker_array; enum enum_mts_parallel_type { /* Parallel slave based on Database name */ MTS_PARALLEL_TYPE_DB_NAME = 0, /* Parallel slave based on group information from Binlog group commit */ MTS_PARALLEL_TYPE_LOGICAL_CLOCK = 1 }; // Extend the following class as per requirement for each sub mode class Mts_submode { private: protected: /* Parallel type */ enum_mts_parallel_type type; public: Mts_submode() = default; inline enum_mts_parallel_type get_type() { return type; } // pure virtual methods. Should be extended in the derieved class /* Logic to schedule the next event. called at the B event for each transaction */ virtual int schedule_next_event(Relay_log_info *rli, Log_event *ev) = 0; /* logic to attach temp tables Should be extended in the derieved class */ virtual void attach_temp_tables(THD *thd, const Relay_log_info *rli, Query_log_event *ev) = 0; /* logic to detach temp tables. Should be extended in the derieved class */ virtual void detach_temp_tables(THD *thd, const Relay_log_info *rli, Query_log_event *ev) = 0; /* returns the least occupied worker. Should be extended in the derieved class */ virtual Slave_worker *get_least_occupied_worker(Relay_log_info *rli, Slave_worker_array *ws, Log_event *ev) = 0; /* wait for slave workers to finish */ virtual int wait_for_workers_to_finish(Relay_log_info *rli, Slave_worker *ignore = nullptr) = 0; /// @brief indicates the start of new file, which may e.g. update internal /// counters in the submode virtual void indicate_start_of_new_file() {} /** Sets additional context before the event is set to execute. */ virtual bool set_multi_threaded_applier_context(const Relay_log_info &, Log_event &) { return false; } virtual ~Mts_submode() = default; }; /** DB partitioned submode For significance of each method check definition of Mts_submode */ class Mts_submode_database : public Mts_submode { public: Mts_submode_database() { type = MTS_PARALLEL_TYPE_DB_NAME; } int schedule_next_event(Relay_log_info *rli, Log_event *ev) override; void attach_temp_tables(THD *thd, const Relay_log_info *rli, Query_log_event *ev) override; void detach_temp_tables(THD *thd, const Relay_log_info *rli, Query_log_event *ev) override; Slave_worker *get_least_occupied_worker(Relay_log_info *, Slave_worker_array *ws, Log_event *) override; ~Mts_submode_database() override = default; int wait_for_workers_to_finish(Relay_log_info *rli, Slave_worker *ignore = nullptr) override; bool set_multi_threaded_applier_context(const Relay_log_info &rli, Log_event &ev) override; private: bool unfold_transaction_payload_event( mysql::binlog::event::Format_description_event &fde, Transaction_payload_log_event &tple, std::vector &events); }; /** Parallelization using Master parallelization information For significance of each method check definition of Mts_submode */ class Mts_submode_logical_clock : public Mts_submode { private: bool first_event, force_new_group; bool is_new_group; uint delegated_jobs; /* "instant" value of committed transactions low-water-mark */ std::atomic last_lwm_timestamp; /* GAQ index corresponding to the min commit point */ ulong last_lwm_index; longlong last_committed; longlong sequence_number; public: uint jobs_done; bool is_error; /* the logical timestamp of the olderst transaction that is being waited by before to resume scheduling. */ std::atomic min_waited_timestamp; /* Committed transactions and those that are waiting for their commit parents comprise sequences whose items are identified as GAQ index. An empty sequence is described by the following magic value which can't be in the GAQ legitimate range. todo: an alternative could be to pass a magic value to the constructor. E.g GAQ.size as a good candidate being outside of the valid range. That requires further wl6314 refactoring in activation/deactivation of the scheduler. */ static const ulong INDEX_UNDEF = (ulong)-1; protected: std::pair get_server_and_thread_id(TABLE *table); Slave_worker *get_free_worker(Relay_log_info *rli); public: Mts_submode_logical_clock(); int schedule_next_event(Relay_log_info *rli, Log_event *ev) override; void attach_temp_tables(THD *thd, const Relay_log_info *rli, Query_log_event *ev) override; void detach_temp_tables(THD *thd, const Relay_log_info *rli, Query_log_event *) override; Slave_worker *get_least_occupied_worker(Relay_log_info *rli, Slave_worker_array *ws, Log_event *ev) override; /* Sets the force new group variable */ inline void start_new_group() { force_new_group = true; } /// @brief Sets a flag to indicate that we are starting a new binlog file, /// therefore we need to skip the check for logical clock to not compare /// against sequence_number from previous event (previous file) void indicate_start_of_new_file() override { first_event = true; } /** Withdraw the delegated_job increased by the group. */ void withdraw_delegated_job() { delegated_jobs--; } int wait_for_workers_to_finish(Relay_log_info *rli, Slave_worker *ignore = nullptr) override; bool wait_for_last_committed_trx(Relay_log_info *rli, longlong last_committed_arg); /* LEQ comparison of two logical timestamps follows regular rules for integers. SEQ_UNINIT is regarded as the least value in the clock domain. @param a the lhs logical timestamp value @param b the rhs logical timestamp value @return true when a "<=" b, false otherwise */ static bool clock_leq(longlong a, longlong b) { if (a == SEQ_UNINIT) return true; else if (b == SEQ_UNINIT) return false; else return a <= b; } longlong get_lwm_timestamp(Relay_log_info *rli, bool need_lock); longlong estimate_lwm_timestamp() { return last_lwm_timestamp.load(); } ~Mts_submode_logical_clock() override = default; }; #endif /*MTA_SUBMODE_H*/