Crates.io | mysql_cdc |
lib.rs | mysql_cdc |
version | 0.2.1 |
source | src |
created_at | 2021-09-29 20:54:31.546108 |
updated_at | 2023-07-13 07:42:52.110915 |
description | MySQL/MariaDB binlog change data capture (CDC) connector for Rust |
homepage | https://github.com/rusuly/mysql_cdc |
repository | https://github.com/rusuly/mysql_cdc |
max_upload_size | |
id | 458361 |
size | 172,535 |
MySQL/MariaDB binlog replication client for Rust
Please note the lib currently has the following limitations:
mysql_native_password
and caching_sha2_password
.Real-time replication client works the following way.
use mysql_cdc::binlog_client::BinlogClient;
use mysql_cdc::binlog_options::BinlogOptions;
use mysql_cdc::errors::Error;
use mysql_cdc::providers::mariadb::gtid::gtid_list::GtidList;
use mysql_cdc::providers::mysql::gtid::gtid_set::GtidSet;
use mysql_cdc::replica_options::ReplicaOptions;
use mysql_cdc::ssl_mode::SslMode;
fn main() -> Result<(), Error> {
// Start replication from MariaDB GTID
let _options = BinlogOptions::from_mariadb_gtid(GtidList::parse("0-1-270")?);
// Start replication from MySQL GTID
let gtid_set =
"d4c17f0c-4f11-11ea-93e3-325d3e1cd1c8:1-107, f442510a-2881-11ea-b1dd-27916133dbb2:1-7";
let _options = BinlogOptions::from_mysql_gtid(GtidSet::parse(gtid_set)?);
// Start replication from the position
let _options = BinlogOptions::from_position(String::from("mysql-bin.000008"), 195);
// Start replication from last master position.
// Useful when you are only interested in new changes.
let _options = BinlogOptions::from_end();
// Start replication from first event of first available master binlog.
// Note that binlog files by default have expiration time and deleted.
let options = BinlogOptions::from_start();
let options = ReplicaOptions {
username: String::from("root"),
password: String::from("Qwertyu1"),
blocking: true,
ssl_mode: SslMode::Disabled,
binlog: options,
..Default::default()
};
let mut client = BinlogClient::new(options);
for result in client.replicate()? {
let (header, event) = result?;
println!("{:#?}", header);
println!("{:#?}", event);
// You process an event here
// After you processed the event, you need to update replication position
client.commit(&header, &event);
}
Ok(())
}
A typical transaction has the following structure.
GtidEvent
if gtid mode is enabled.TableMapEvent
events.
WriteRowsEvent
events.UpdateRowsEvent
events.DeleteRowsEvent
events.XidEvent
indicating commit of the transaction.It's best practice to use GTID replication with the from_gtid
method. Using the approach you can correctly perform replication failover.
Note that in GTID mode from_gtid
has the following behavior:
from_gtid(@@gtid_purged)
acts like from_start()
from_gtid(@@gtid_executed)
acts like from_end()
In some cases you will need to read binlog files offline from the file system.
This can be done using BinlogReader
class.
use mysql_cdc::{binlog_reader::BinlogReader, errors::Error};
use std::fs::File;
const PATH: &str = "mysql-bin.000001";
fn main() -> Result<(), Error> {
let file = File::open(PATH)?;
let reader = BinlogReader::new(file)?;
for result in reader.read_events() {
let (header, event) = result?;
println!("{:#?}", header);
println!("{:#?}", event);
}
Ok(())
}