syntax = "proto3"; package cdcpb; import "raft_cmdpb.proto"; import "metapb.proto"; import "errorpb.proto"; import "kvrpcpb.proto"; import "gogoproto/gogo.proto"; import "rustproto.proto"; option(gogoproto.sizer_all) = true; option(gogoproto.marshaler_all) = true; option(gogoproto.unmarshaler_all) = true; option(rustproto.lite_runtime_all) = true; option java_package = "org.tikv.kvproto"; message Header { uint64 cluster_id = 1; string ticdc_version = 2; } message DuplicateRequest { uint64 region_id = 1; } message Compatibility { string required_version = 1; } // ClusterIDMismatch is an error variable that // tells people that the cluster ID of the request does not match the TiKV cluster ID. message ClusterIDMismatch { // The current tikv cluster ID. uint64 current = 1; // The cluster ID of the TiCDC request. uint64 request = 2; } message Error { errorpb.NotLeader not_leader = 1; errorpb.RegionNotFound region_not_found = 2; errorpb.EpochNotMatch epoch_not_match = 3; DuplicateRequest duplicate_request = 4; Compatibility compatibility = 5; ClusterIDMismatch cluster_id_mismatch = 6; } message TxnInfo { uint64 start_ts = 1; bytes primary = 2; } message TxnStatus { uint64 start_ts = 1; uint64 min_commit_ts = 2; uint64 commit_ts = 3; bool is_rolled_back = 4; } message Event { enum LogType { UNKNOWN = 0; PREWRITE = 1; COMMIT = 2; ROLLBACK = 3; COMMITTED = 4; INITIALIZED = 5; } message Row { uint64 start_ts = 1; uint64 commit_ts = 2; LogType type = 3; enum OpType { UNKNOWN = 0; PUT = 1; DELETE = 2; } OpType op_type = 4; bytes key = 5; bytes value = 6; bytes old_value = 7; } message Entries { repeated Row entries = 1; } message Admin { raft_cmdpb.AdminRequest admin_request = 1; raft_cmdpb.AdminResponse admin_response = 2; } message LongTxn { repeated TxnInfo txn_info = 1; } uint64 region_id = 1; uint64 index = 2; uint64 request_id = 7; oneof event { Entries entries = 3; Admin admin = 4; Error error = 5; uint64 resolved_ts = 6 [deprecated=true]; // Note that field 7 is taken by request_id. LongTxn long_txn = 8; // More region level events ... } } message ChangeDataEvent { repeated Event events = 1; ResolvedTs resolved_ts = 2; // More store level events ... } message ResolvedTs { repeated uint64 regions = 1; uint64 ts = 2; } message ChangeDataRequest { message Register {} message NotifyTxnStatus { repeated TxnStatus txn_status = 1; } Header header = 1; uint64 region_id = 2; metapb.RegionEpoch region_epoch = 3; uint64 checkpoint_ts = 4; bytes start_key = 5; bytes end_key = 6; // Used for CDC to identify events corresponding to different requests. uint64 request_id = 7; kvrpcpb.ExtraOp extra_op = 8; oneof request { // A normal request that trying to register change data feed on a region. Register register = 9; // Notify the region that some of the running transactions on the region has a pushed // min_commit_ts so that the resolved_ts can be advanced. NotifyTxnStatus notify_txn_status = 10; } } service ChangeData { rpc EventFeed(stream ChangeDataRequest) returns(stream ChangeDataEvent); }