/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ syntax = "proto2"; package pulsar.proto; option java_package = "org.apache.pulsar.common.api.proto"; option optimize_for = LITE_RUNTIME; message Schema { enum Type { None = 0; String = 1; Json = 2; Protobuf = 3; Avro = 4; Bool = 5; Int8 = 6; Int16 = 7; Int32 = 8; Int64 = 9; Float = 10; Double = 11; Date = 12; Time = 13; Timestamp = 14; KeyValue = 15; Instant = 16; LocalDate = 17; LocalTime = 18; LocalDateTime = 19; ProtobufNative = 20; } required string name = 1; required bytes schema_data = 3; required Type type = 4; repeated KeyValue properties = 5; } message MessageIdData { required uint64 ledgerId = 1; required uint64 entryId = 2; optional int32 partition = 3 [default = -1]; optional int32 batch_index = 4 [default = -1]; repeated int64 ack_set = 5; optional int32 batch_size = 6; } message KeyValue { required string key = 1; required string value = 2; } message KeyLongValue { required string key = 1; required uint64 value = 2; } message IntRange { required int32 start = 1; required int32 end = 2; } message EncryptionKeys { required string key = 1; required bytes value = 2; repeated KeyValue metadata = 3; } enum CompressionType { NONE = 0; LZ4 = 1; ZLIB = 2; ZSTD = 3; SNAPPY = 4; } enum ProducerAccessMode { Shared = 0; // By default multiple producers can publish on a topic Exclusive = 1; // Require exclusive access for producer. Fail immediately if there's already a producer connected. WaitForExclusive = 2; // Producer creation is pending until it can acquire exclusive access } message MessageMetadata { required string producer_name = 1; required uint64 sequence_id = 2; required uint64 publish_time = 3; repeated KeyValue properties = 4; // Property set on replicated message, // includes the source cluster name optional string replicated_from = 5; //key to decide partition for the msg optional string partition_key = 6; // Override namespace's replication repeated string replicate_to = 7; optional CompressionType compression = 8 [default = NONE]; optional uint32 uncompressed_size = 9 [default = 0]; // Removed below checksum field from Metadata as // it should be part of send-command which keeps checksum of header + payload //optional sfixed64 checksum = 10; // differentiate single and batch message metadata optional int32 num_messages_in_batch = 11 [default = 1]; // the timestamp that this event occurs. it is typically set by applications. // if this field is omitted, `publish_time` can be used for the purpose of `event_time`. optional uint64 event_time = 12 [default = 0]; // Contains encryption key name, encrypted key and metadata to describe the key repeated EncryptionKeys encryption_keys = 13; // Algorithm used to encrypt data key optional string encryption_algo = 14; // Additional parameters required by encryption optional bytes encryption_param = 15; optional bytes schema_version = 16; optional bool partition_key_b64_encoded = 17 [ default = false ]; // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode. optional bytes ordering_key = 18; // Mark the message to be delivered at or after the specified timestamp optional int64 deliver_at_time = 19; // Identify whether a message is a "marker" message used for // internal metadata instead of application published data. // Markers will generally not be propagated back to clients optional int32 marker_type = 20; // transaction related message info optional uint64 txnid_least_bits = 22; optional uint64 txnid_most_bits = 23; /// Add highest sequence id to support batch message with external sequence id optional uint64 highest_sequence_id = 24 [default = 0]; // Indicate if the message payload value is set optional bool null_value = 25 [default = false]; optional string uuid = 26; optional int32 num_chunks_from_msg = 27; optional int32 total_chunk_msg_size = 28; optional int32 chunk_id = 29; // Indicate if the message partition key is set optional bool null_partition_key = 30 [default = false]; } message SingleMessageMetadata { repeated KeyValue properties = 1; optional string partition_key = 2; required int32 payload_size = 3; optional bool compacted_out = 4 [default = false]; // the timestamp that this event occurs. it is typically set by applications. // if this field is omitted, `publish_time` can be used for the purpose of `event_time`. optional uint64 event_time = 5 [default = 0]; optional bool partition_key_b64_encoded = 6 [ default = false ]; // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode. optional bytes ordering_key = 7; // Allows consumer retrieve the sequence id that the producer set. optional uint64 sequence_id = 8; // Indicate if the message payload value is set optional bool null_value = 9 [ default = false ]; // Indicate if the message partition key is set optional bool null_partition_key = 10 [ default = false]; } // metadata added for entry from broker message BrokerEntryMetadata { optional uint64 broker_timestamp = 1; optional uint64 index = 2; } enum ServerError { UnknownError = 0; MetadataError = 1; // Error with ZK/metadata PersistenceError = 2; // Error writing reading from BK AuthenticationError = 3; // Non valid authentication AuthorizationError = 4; // Not authorized to use resource ConsumerBusy = 5; // Unable to subscribe/unsubscribe because // other consumers are connected ServiceNotReady = 6; // Any error that requires client retry operation with a fresh lookup ProducerBlockedQuotaExceededError = 7; // Unable to create producer because backlog quota exceeded ProducerBlockedQuotaExceededException = 8; // Exception while creating producer because quota exceeded ChecksumError = 9; // Error while verifying message checksum UnsupportedVersionError = 10; // Error when an older client/version doesn't support a required feature TopicNotFound = 11; // Topic not found SubscriptionNotFound = 12; // Subscription not found ConsumerNotFound = 13; // Consumer not found TooManyRequests = 14; // Error with too many simultaneously request TopicTerminatedError = 15; // The topic has been terminated ProducerBusy = 16; // Producer with same name is already connected InvalidTopicName = 17; // The topic name is not valid IncompatibleSchema = 18; // Specified schema was incompatible with topic schema ConsumerAssignError = 19; // Dispatcher assign consumer error TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error InvalidTxnStatus = 21; // Invalid txn status error NotAllowedError = 22; // Not allowed error TransactionConflict = 23; // Ack with transaction conflict TransactionNotFound = 24; // Transaction not found ProducerFenced = 25; // When a producer asks and fail to get exclusive producer access, // or loses the eclusive status after a reconnection, the broker will // use this error to indicate that this producer is now permanently // fenced. Applications are now supposed to close it and create a // new producer } enum AuthMethod { AuthMethodNone = 0; AuthMethodYcaV1 = 1; AuthMethodAthens = 2; } // Each protocol version identify new features that are // incrementally added to the protocol enum ProtocolVersion { v0 = 0; // Initial versioning v1 = 1; // Added application keep-alive v2 = 2; // Added RedeliverUnacknowledgedMessages Command v3 = 3; // Added compression with LZ4 and ZLib v4 = 4; // Added batch message support v5 = 5; // Added disconnect client w/o closing connection v6 = 6; // Added checksum computation for metadata + payload v7 = 7; // Added CommandLookupTopic - Binary Lookup v8 = 8; // Added CommandConsumerStats - Client fetches broker side consumer stats v9 = 9; // Added end of topic notification v10 = 10;// Added proxy to broker v11 = 11;// C++ consumers before this version are not correctly handling the checksum field v12 = 12;// Added get topic's last messageId from broker // Added CommandActiveConsumerChange // Added CommandGetTopicsOfNamespace v13 = 13; // Schema-registry : added avro schema format for json v14 = 14; // Add CommandAuthChallenge and CommandAuthResponse for mutual auth // Added Key_Shared subscription v15 = 15; // Add CommandGetOrCreateSchema and CommandGetOrCreateSchemaResponse v16 = 16; // Add support for raw message metadata v17 = 17; // Added support ack receipt } message CommandConnect { required string client_version = 1; optional AuthMethod auth_method = 2; // Deprecated. Use "auth_method_name" instead. optional string auth_method_name = 5; optional bytes auth_data = 3; optional int32 protocol_version = 4 [default = 0]; // Client can ask to be proxyied to a specific broker // This is only honored by a Pulsar proxy optional string proxy_to_broker_url = 6; // Original principal that was verified by // a Pulsar proxy. In this case the auth info above // will be the auth of the proxy itself optional string original_principal = 7; // Original auth role and auth Method that was passed // to the proxy. In this case the auth info above // will be the auth of the proxy itself optional string original_auth_data = 8; optional string original_auth_method = 9; // Feature flags optional FeatureFlags feature_flags = 10; } message FeatureFlags { optional bool supports_auth_refresh = 1 [default = false]; optional bool supports_broker_entry_metadata = 2 [default = false]; } message CommandConnected { required string server_version = 1; optional int32 protocol_version = 2 [default = 0]; optional int32 max_message_size = 3; } message CommandAuthResponse { optional string client_version = 1; optional AuthData response = 2; optional int32 protocol_version = 3 [default = 0]; } message CommandAuthChallenge { optional string server_version = 1; optional AuthData challenge = 2; optional int32 protocol_version = 3 [default = 0]; } // To support mutual authentication type, such as Sasl, reuse this command to mutual auth. message AuthData { optional string auth_method_name = 1; optional bytes auth_data = 2; } enum KeySharedMode { AUTO_SPLIT = 0; STICKY = 1; } message KeySharedMeta { required KeySharedMode keySharedMode = 1; repeated IntRange hashRanges = 3; optional bool allowOutOfOrderDelivery = 4 [default = false]; } message CommandSubscribe { enum SubType { Exclusive = 0; Shared = 1; Failover = 2; Key_Shared = 3; } required string topic = 1; required string subscription = 2; required SubType subType = 3; required uint64 consumer_id = 4; required uint64 request_id = 5; optional string consumer_name = 6; optional int32 priority_level = 7; // Signal wether the subscription should be backed by a // durable cursor or not optional bool durable = 8 [default = true]; // If specified, the subscription will position the cursor // markd-delete position on the particular message id and // will send messages from that point optional MessageIdData start_message_id = 9; /// Add optional metadata key=value to this consumer repeated KeyValue metadata = 10; optional bool read_compacted = 11; optional Schema schema = 12; enum InitialPosition { Latest = 0; Earliest = 1; } // Signal whether the subscription will initialize on latest // or not -- earliest optional InitialPosition initialPosition = 13 [default = Latest]; // Mark the subscription as "replicated". Pulsar will make sure // to periodically sync the state of replicated subscriptions // across different clusters (when using geo-replication). optional bool replicate_subscription_state = 14; // If true, the subscribe operation will cause a topic to be // created if it does not exist already (and if topic auto-creation // is allowed by broker. // If false, the subscribe operation will fail if the topic // does not exist. optional bool force_topic_creation = 15 [default = true]; // If specified, the subscription will reset cursor's position back // to specified seconds and will send messages from that point optional uint64 start_message_rollback_duration_sec = 16 [default = 0]; optional KeySharedMeta keySharedMeta = 17; } message CommandPartitionedTopicMetadata { required string topic = 1; required uint64 request_id = 2; // TODO - Remove original_principal, original_auth_data, original_auth_method // Original principal that was verified by // a Pulsar proxy. optional string original_principal = 3; // Original auth role and auth Method that was passed // to the proxy. optional string original_auth_data = 4; optional string original_auth_method = 5; } message CommandPartitionedTopicMetadataResponse { enum LookupType { Success = 0; Failed = 1; } optional uint32 partitions = 1; // Optional in case of error required uint64 request_id = 2; optional LookupType response = 3; optional ServerError error = 4; optional string message = 5; } message CommandLookupTopic { required string topic = 1; required uint64 request_id = 2; optional bool authoritative = 3 [default = false]; // TODO - Remove original_principal, original_auth_data, original_auth_method // Original principal that was verified by // a Pulsar proxy. optional string original_principal = 4; // Original auth role and auth Method that was passed // to the proxy. optional string original_auth_data = 5; optional string original_auth_method = 6; // optional string advertised_listener_name = 7; } message CommandLookupTopicResponse { enum LookupType { Redirect = 0; Connect = 1; Failed = 2; } optional string brokerServiceUrl = 1; // Optional in case of error optional string brokerServiceUrlTls = 2; optional LookupType response = 3; required uint64 request_id = 4; optional bool authoritative = 5 [default = false]; optional ServerError error = 6; optional string message = 7; // If it's true, indicates to the client that it must // always connect through the service url after the // lookup has been completed. optional bool proxy_through_service_url = 8 [default = false]; } /// Create a new Producer on a topic, assigning the given producer_id, /// all messages sent with this producer_id will be persisted on the topic message CommandProducer { required string topic = 1; required uint64 producer_id = 2; required uint64 request_id = 3; /// If a producer name is specified, the name will be used, /// otherwise the broker will generate a unique name optional string producer_name = 4; optional bool encrypted = 5 [default = false]; /// Add optional metadata key=value to this producer repeated KeyValue metadata = 6; optional Schema schema = 7; // If producer reconnect to broker, the epoch of this producer will +1 optional uint64 epoch = 8 [default = 0]; // Indicate the name of the producer is generated or user provided // Use default true here is in order to be forward compatible with the client optional bool user_provided_producer_name = 9 [default = true]; // Require that this producers will be the only producer allowed on the topic optional ProducerAccessMode producer_access_mode = 10 [default = Shared]; // Topic epoch is used to fence off producers that reconnects after a new // exclusive producer has already taken over. This id is assigned by the // broker on the CommandProducerSuccess. The first time, the client will // leave it empty and then it will always carry the same epoch number on // the subsequent reconnections. optional uint64 topic_epoch = 11; } message CommandSend { required uint64 producer_id = 1; required uint64 sequence_id = 2; optional int32 num_messages = 3 [default = 1]; optional uint64 txnid_least_bits = 4 [default = 0]; optional uint64 txnid_most_bits = 5 [default = 0]; /// Add highest sequence id to support batch message with external sequence id optional uint64 highest_sequence_id = 6 [default = 0]; optional bool is_chunk =7 [default = false]; } message CommandSendReceipt { required uint64 producer_id = 1; required uint64 sequence_id = 2; optional MessageIdData message_id = 3; optional uint64 highest_sequence_id = 4 [default = 0]; } message CommandSendError { required uint64 producer_id = 1; required uint64 sequence_id = 2; required ServerError error = 3; required string message = 4; } message CommandMessage { required uint64 consumer_id = 1; required MessageIdData message_id = 2; optional uint32 redelivery_count = 3 [default = 0]; repeated int64 ack_set = 4; } message CommandAck { enum AckType { Individual = 0; Cumulative = 1; } required uint64 consumer_id = 1; required AckType ack_type = 2; // In case of individual acks, the client can pass a list of message ids repeated MessageIdData message_id = 3; // Acks can contain a flag to indicate the consumer // received an invalid message that got discarded // before being passed on to the application. enum ValidationError { UncompressedSizeCorruption = 0; DecompressionError = 1; ChecksumMismatch = 2; BatchDeSerializeError = 3; DecryptionError = 4; } optional ValidationError validation_error = 4; repeated KeyLongValue properties = 5; optional uint64 txnid_least_bits = 6 [default = 0]; optional uint64 txnid_most_bits = 7 [default = 0]; optional uint64 request_id = 8; } message CommandAckResponse { required uint64 consumer_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional ServerError error = 4; optional string message = 5; optional uint64 request_id = 6; } // changes on active consumer message CommandActiveConsumerChange { required uint64 consumer_id = 1; optional bool is_active = 2 [default = false]; } message CommandFlow { required uint64 consumer_id = 1; // Max number of messages to prefetch, in addition // of any number previously specified required uint32 messagePermits = 2; } message CommandUnsubscribe { required uint64 consumer_id = 1; required uint64 request_id = 2; } // Reset an existing consumer to a particular message id message CommandSeek { required uint64 consumer_id = 1; required uint64 request_id = 2; optional MessageIdData message_id = 3; optional uint64 message_publish_time = 4; } // Message sent by broker to client when a topic // has been forcefully terminated and there are no more // messages left to consume message CommandReachedEndOfTopic { required uint64 consumer_id = 1; } message CommandCloseProducer { required uint64 producer_id = 1; required uint64 request_id = 2; } message CommandCloseConsumer { required uint64 consumer_id = 1; required uint64 request_id = 2; } message CommandRedeliverUnacknowledgedMessages { required uint64 consumer_id = 1; repeated MessageIdData message_ids = 2; } message CommandSuccess { required uint64 request_id = 1; optional Schema schema = 2; } /// Response from CommandProducer message CommandProducerSuccess { required uint64 request_id = 1; required string producer_name = 2; // The last sequence id that was stored by this producer in the previous session // This will only be meaningful if deduplication has been enabled. optional int64 last_sequence_id = 3 [default = -1]; optional bytes schema_version = 4; // The topic epoch assigned by the broker. This field will only be set if we // were requiring exclusive access when creating the producer. optional uint64 topic_epoch = 5; // If producer is not "ready", the client will avoid to timeout the request // for creating the producer. Instead it will wait indefinitely until it gets // a subsequent `CommandProducerSuccess` with `producer_ready==true`. optional bool producer_ready = 6 [default = true]; } message CommandError { required uint64 request_id = 1; required ServerError error = 2; required string message = 3; } // Commands to probe the state of connection. // When either client or broker doesn't receive commands for certain // amount of time, they will send a Ping probe. message CommandPing { } message CommandPong { } message CommandConsumerStats { required uint64 request_id = 1; // required string topic_name = 2; // required string subscription_name = 3; required uint64 consumer_id = 4; } message CommandConsumerStatsResponse { required uint64 request_id = 1; optional ServerError error_code = 2; optional string error_message = 3; /// Total rate of messages delivered to the consumer. msg/s optional double msgRateOut = 4; /// Total throughput delivered to the consumer. bytes/s optional double msgThroughputOut = 5; /// Total rate of messages redelivered by this consumer. msg/s optional double msgRateRedeliver = 6; /// Name of the consumer optional string consumerName = 7; /// Number of available message permits for the consumer optional uint64 availablePermits = 8; /// Number of unacknowledged messages for the consumer optional uint64 unackedMessages = 9; /// Flag to verify if consumer is blocked due to reaching threshold of unacked messages optional bool blockedConsumerOnUnackedMsgs = 10; /// Address of this consumer optional string address = 11; /// Timestamp of connection optional string connectedSince = 12; /// Whether this subscription is Exclusive or Shared or Failover optional string type = 13; /// Total rate of messages expired on this subscription. msg/s optional double msgRateExpired = 14; /// Number of messages in the subscription backlog optional uint64 msgBacklog = 15; } message CommandGetLastMessageId { required uint64 consumer_id = 1; required uint64 request_id = 2; } message CommandGetLastMessageIdResponse { required MessageIdData last_message_id = 1; required uint64 request_id = 2; } message CommandGetTopicsOfNamespace { enum Mode { PERSISTENT = 0; NON_PERSISTENT = 1; ALL = 2; } required uint64 request_id = 1; required string namespace = 2; optional Mode mode = 3 [default = PERSISTENT]; } message CommandGetTopicsOfNamespaceResponse { required uint64 request_id = 1; repeated string topics = 2; } message CommandGetSchema { required uint64 request_id = 1; required string topic = 2; optional bytes schema_version = 3; } message CommandGetSchemaResponse { required uint64 request_id = 1; optional ServerError error_code = 2; optional string error_message = 3; optional Schema schema = 4; optional bytes schema_version = 5; } message CommandGetOrCreateSchema { required uint64 request_id = 1; required string topic = 2; required Schema schema = 3; } message CommandGetOrCreateSchemaResponse { required uint64 request_id = 1; optional ServerError error_code = 2; optional string error_message = 3; optional bytes schema_version = 4; } /// --- transaction related --- enum TxnAction { COMMIT = 0; ABORT = 1; } message CommandNewTxn { required uint64 request_id = 1; optional uint64 txn_ttl_seconds = 2 [default = 0]; optional uint64 tc_id = 3 [default = 0]; } message CommandNewTxnResponse { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional ServerError error = 4; optional string message = 5; } message CommandAddPartitionToTxn { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; repeated string partitions = 4; } message CommandAddPartitionToTxnResponse { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional ServerError error = 4; optional string message = 5; } message Subscription { required string topic = 1; required string subscription = 2; } message CommandAddSubscriptionToTxn { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; repeated Subscription subscription = 4; } message CommandAddSubscriptionToTxnResponse { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional ServerError error = 4; optional string message = 5; } message CommandEndTxn { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional TxnAction txn_action = 4; } message CommandEndTxnResponse { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional ServerError error = 4; optional string message = 5; } message CommandEndTxnOnPartition { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional string topic = 4; optional TxnAction txn_action = 5; optional uint64 txnid_least_bits_of_low_watermark = 6; } message CommandEndTxnOnPartitionResponse { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional ServerError error = 4; optional string message = 5; } message CommandEndTxnOnSubscription { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional Subscription subscription= 4; optional TxnAction txn_action = 5; } message CommandEndTxnOnSubscriptionResponse { required uint64 request_id = 1; optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional ServerError error = 4; optional string message = 5; } message BaseCommand { enum Type { CONNECT = 2; CONNECTED = 3; SUBSCRIBE = 4; PRODUCER = 5; SEND = 6; SEND_RECEIPT= 7; SEND_ERROR = 8; MESSAGE = 9; ACK = 10; FLOW = 11; UNSUBSCRIBE = 12; SUCCESS = 13; ERROR = 14; CLOSE_PRODUCER = 15; CLOSE_CONSUMER = 16; PRODUCER_SUCCESS = 17; PING = 18; PONG = 19; REDELIVER_UNACKNOWLEDGED_MESSAGES = 20; PARTITIONED_METADATA = 21; PARTITIONED_METADATA_RESPONSE = 22; LOOKUP = 23; LOOKUP_RESPONSE = 24; CONSUMER_STATS = 25; CONSUMER_STATS_RESPONSE = 26; REACHED_END_OF_TOPIC = 27; SEEK = 28; GET_LAST_MESSAGE_ID = 29; GET_LAST_MESSAGE_ID_RESPONSE = 30; ACTIVE_CONSUMER_CHANGE = 31; GET_TOPICS_OF_NAMESPACE = 32; GET_TOPICS_OF_NAMESPACE_RESPONSE = 33; GET_SCHEMA = 34; GET_SCHEMA_RESPONSE = 35; AUTH_CHALLENGE = 36; AUTH_RESPONSE = 37; ACK_RESPONSE = 38; GET_OR_CREATE_SCHEMA = 39; GET_OR_CREATE_SCHEMA_RESPONSE = 40; // transaction related NEW_TXN = 50; NEW_TXN_RESPONSE = 51; ADD_PARTITION_TO_TXN = 52; ADD_PARTITION_TO_TXN_RESPONSE = 53; ADD_SUBSCRIPTION_TO_TXN = 54; ADD_SUBSCRIPTION_TO_TXN_RESPONSE = 55; END_TXN = 56; END_TXN_RESPONSE = 57; END_TXN_ON_PARTITION = 58; END_TXN_ON_PARTITION_RESPONSE = 59; END_TXN_ON_SUBSCRIPTION = 60; END_TXN_ON_SUBSCRIPTION_RESPONSE = 61; } required Type type = 1; optional CommandConnect connect = 2; optional CommandConnected connected = 3; optional CommandSubscribe subscribe = 4; optional CommandProducer producer = 5; optional CommandSend send = 6; optional CommandSendReceipt send_receipt = 7; optional CommandSendError send_error = 8; optional CommandMessage message = 9; optional CommandAck ack = 10; optional CommandFlow flow = 11; optional CommandUnsubscribe unsubscribe = 12; optional CommandSuccess success = 13; optional CommandError error = 14; optional CommandCloseProducer close_producer = 15; optional CommandCloseConsumer close_consumer = 16; optional CommandProducerSuccess producer_success = 17; optional CommandPing ping = 18; optional CommandPong pong = 19; optional CommandRedeliverUnacknowledgedMessages redeliverUnacknowledgedMessages = 20; optional CommandPartitionedTopicMetadata partitionMetadata = 21; optional CommandPartitionedTopicMetadataResponse partitionMetadataResponse = 22; optional CommandLookupTopic lookupTopic = 23; optional CommandLookupTopicResponse lookupTopicResponse = 24; optional CommandConsumerStats consumerStats = 25; optional CommandConsumerStatsResponse consumerStatsResponse = 26; optional CommandReachedEndOfTopic reachedEndOfTopic = 27; optional CommandSeek seek = 28; optional CommandGetLastMessageId getLastMessageId = 29; optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30; optional CommandActiveConsumerChange active_consumer_change = 31; optional CommandGetTopicsOfNamespace getTopicsOfNamespace = 32; optional CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 33; optional CommandGetSchema getSchema = 34; optional CommandGetSchemaResponse getSchemaResponse = 35; optional CommandAuthChallenge authChallenge = 36; optional CommandAuthResponse authResponse = 37; optional CommandAckResponse ackResponse = 38; optional CommandGetOrCreateSchema getOrCreateSchema = 39; optional CommandGetOrCreateSchemaResponse getOrCreateSchemaResponse = 40; // transaction related optional CommandNewTxn newTxn = 50; optional CommandNewTxnResponse newTxnResponse = 51; optional CommandAddPartitionToTxn addPartitionToTxn= 52; optional CommandAddPartitionToTxnResponse addPartitionToTxnResponse = 53; optional CommandAddSubscriptionToTxn addSubscriptionToTxn = 54; optional CommandAddSubscriptionToTxnResponse addSubscriptionToTxnResponse = 55; optional CommandEndTxn endTxn = 56; optional CommandEndTxnResponse endTxnResponse = 57; optional CommandEndTxnOnPartition endTxnOnPartition = 58; optional CommandEndTxnOnPartitionResponse endTxnOnPartitionResponse = 59; optional CommandEndTxnOnSubscription endTxnOnSubscription = 60; optional CommandEndTxnOnSubscriptionResponse endTxnOnSubscriptionResponse = 61; }