| Crates.io | mysql_cdc |
| lib.rs | mysql_cdc |
| version | 0.2.1 |
| created_at | 2021-09-29 20:54:31.546108+00 |
| updated_at | 2023-07-13 07:42:52.110915+00 |
| description | MySQL/MariaDB binlog change data capture (CDC) connector for Rust |
| homepage | https://github.com/rusuly/mysql_cdc |
| repository | https://github.com/rusuly/mysql_cdc |
| max_upload_size | |
| id | 458361 |
| size | 172,535 |
MySQL/MariaDB binlog replication client for Rust
Please note the lib currently has the following limitations:
mysql_native_password and caching_sha2_password.Real-time replication client works the following way.
use mysql_cdc::binlog_client::BinlogClient;
use mysql_cdc::binlog_options::BinlogOptions;
use mysql_cdc::errors::Error;
use mysql_cdc::providers::mariadb::gtid::gtid_list::GtidList;
use mysql_cdc::providers::mysql::gtid::gtid_set::GtidSet;
use mysql_cdc::replica_options::ReplicaOptions;
use mysql_cdc::ssl_mode::SslMode;
fn main() -> Result<(), Error> {
// Start replication from MariaDB GTID
let _options = BinlogOptions::from_mariadb_gtid(GtidList::parse("0-1-270")?);
// Start replication from MySQL GTID
let gtid_set =
"d4c17f0c-4f11-11ea-93e3-325d3e1cd1c8:1-107, f442510a-2881-11ea-b1dd-27916133dbb2:1-7";
let _options = BinlogOptions::from_mysql_gtid(GtidSet::parse(gtid_set)?);
// Start replication from the position
let _options = BinlogOptions::from_position(String::from("mysql-bin.000008"), 195);
// Start replication from last master position.
// Useful when you are only interested in new changes.
let _options = BinlogOptions::from_end();
// Start replication from first event of first available master binlog.
// Note that binlog files by default have expiration time and deleted.
let options = BinlogOptions::from_start();
let options = ReplicaOptions {
username: String::from("root"),
password: String::from("Qwertyu1"),
blocking: true,
ssl_mode: SslMode::Disabled,
binlog: options,
..Default::default()
};
let mut client = BinlogClient::new(options);
for result in client.replicate()? {
let (header, event) = result?;
println!("{:#?}", header);
println!("{:#?}", event);
// You process an event here
// After you processed the event, you need to update replication position
client.commit(&header, &event);
}
Ok(())
}
A typical transaction has the following structure.
GtidEvent if gtid mode is enabled.TableMapEvent events.
WriteRowsEvent events.UpdateRowsEvent events.DeleteRowsEvent events.XidEvent indicating commit of the transaction.It's best practice to use GTID replication with the from_gtid method. Using the approach you can correctly perform replication failover.
Note that in GTID mode from_gtid has the following behavior:
from_gtid(@@gtid_purged) acts like from_start()from_gtid(@@gtid_executed) acts like from_end()In some cases you will need to read binlog files offline from the file system.
This can be done using BinlogReader class.
use mysql_cdc::{binlog_reader::BinlogReader, errors::Error};
use std::fs::File;
const PATH: &str = "mysql-bin.000001";
fn main() -> Result<(), Error> {
let file = File::open(PATH)?;
let reader = BinlogReader::new(file)?;
for result in reader.read_events() {
let (header, event) = result?;
println!("{:#?}", header);
println!("{:#?}", event);
}
Ok(())
}