syntax = "proto3"; package proto; option java_package = "io.liftbridge.proto"; message NullableInt64 { int64 value = 1; } message NullableInt32 { int32 value = 1; } message NullableBool { bool value = 1; } // CreateStreamRequest is sent to create a new stream. message CreateStreamRequest { string subject = 1; // Stream NATS subject string name = 2; // Stream name (unique per subject) string group = 3; // Partitions NATS subject amongst group members int32 replicationFactor = 4; // Number of stream replicas int32 partitions = 5; // Number of stream partitions NullableInt64 RetentionMaxBytes = 6; // The maximum size a stream's log can grow to, in bytes. NullableInt64 RetentionMaxMessages = 7; // The maximum size a stream's log can grow to, in messages NullableInt64 RetentionMaxAge = 8; // The TTL for stream log segment files NullableInt64 CleanerInterval = 9; // The frequency to check for log cleaner NullableInt64 SegmentMaxBytes = 10; // The maximum size of a single stream log segment file in bytes NullableInt64 SegmentMaxAge = 11; // The maximum time before a new stream log segment is rolled out NullableInt32 CompactMaxGoroutines = 12; // The maximum number of concurrent goroutines to use for compaction on a stream log NullableBool CompactEnabled = 13; // CompactEnabled controls compaction for a stream. } // CreateStreamResponse is sent by server after creating a stream. message CreateStreamResponse { // Intentionally empty. } // DeleteStreamRequest is sent to delete a stream. message DeleteStreamRequest { string name = 1; // Stream name } // DeleteStreamResponse is sent by server after deleting a stream. message DeleteStreamResponse { // Intentionally empty. } // PauseStreamRequest is sent to pause the specified stream partitions. If no // partitions are specified, this will pause all partitions in the stream. // Partitions are resumed when they are published to via the Liftbridge Publish // API. message PauseStreamRequest { string name = 1; // Stream name repeated int32 partitions = 2; // Stream partitions bool resumeAll = 3; // Resume all partitions at once } // PauseStreamResponse is sent by server after pausing a stream. message PauseStreamResponse { // Intentionally empty. } // StartPosition determines the start-position type on a subscription. enum StartPosition { NEW_ONLY = 0; // Start at new messages after the latest OFFSET = 1; // Start at a specified offset EARLIEST = 2; // Start at the oldest message LATEST = 3; // Start at the newest message TIMESTAMP = 4; // Start at a specified timestamp } // SubscribeRequest is sent to subscribe to a stream partition. message SubscribeRequest { string stream = 1; // Stream name to subscribe to int32 partition = 2; // Stream partition to subscribe to StartPosition startPosition = 3; // Where to begin consuming from int64 startOffset = 4 [jstype=JS_STRING]; // Offset to begin consuming from int64 startTimestamp = 5 [jstype=JS_STRING]; // Timestamp to begin consuming from bool readISRReplica = 6; // Subscribe from a random ISR replica instead of choosing explicitly leader } // FetchMetadataRequest is sent to retrieve the latest cluster metadata. message FetchMetadataRequest { repeated string streams = 1; // The streams to fetch metadata for (all if empty) } // FetchMetadataResponse contains the cluster metadata requested. message FetchMetadataResponse { repeated Broker brokers = 1; // Information for all brokers repeated StreamMetadata metadata = 2; // Information for all streams } // PublishRequest is sent to publish a new message to a stream. message PublishRequest { bytes key = 1; // Message key bytes value = 2; // Message payload string stream = 3; // Stream name to publish to int32 partition = 4; // Stream partition to publish to map headers = 5; // Message headers string ackInbox = 6; // NATS subject to publish acks to string correlationId = 7; // User-supplied value to correlate acks to publishes AckPolicy ackPolicy = 8; // Controls the behavior of acks } // PublishResponse is sent by the server after publishing a message to a // stream. message PublishResponse { Ack ack = 1; // The ack for the published message if AckPolicy was not NONE } // PublishToSubjectRequest is sent to publish a Liftbridge message to a NATS // subject. message PublishToSubjectRequest { bytes key = 1; // Message key bytes value = 2; // Message payload string subject = 3; // NATS subject to publish to map headers = 4; // Message headers string ackInbox = 5; // NATS subject to publish acks to string correlationId = 6; // User-supplied value to correlate acks to publishes AckPolicy ackPolicy = 7; // Controls the behavior of acks } // PublishToSubjectResponse is sent by the server after publishing a message to // a NATS subject. message PublishToSubjectResponse { Ack ack = 1; // The ack for the published message if AckPolicy was not NONE } // Broker contains information for a Liftbridge broker. message Broker { string id = 1; // Broker id string host = 2; // Broker host int32 port = 3; // Broker port } // StreamMetadata contains information for a stream. message StreamMetadata { enum Error { OK = 0; UNKNOWN_STREAM = 1; } string name = 1; // The name of the stream being described string subject = 2; // The stream subject Error error = 3; // Indicates if there was something wrong with the requested stream map partitions = 4; // Information for the stream partitions } // PartitionMetadata contains information for a stream partition. message PartitionMetadata { int32 id = 1; // Partition id string leader = 2; // Broker id of the partition leader repeated string replicas = 3; // Broker ids of the partition replicas repeated string isr = 4; // Broker ids of the in-sync replica set } // AckPolicy controls the behavior of message acknowledgements. enum AckPolicy { LEADER = 0; // The ack will be sent once the leader has written the message to its log ALL = 1; // The ack will be sent after the ISR replicas have written the message to their logs NONE = 2; // No ack will be sent } // Message represents a message from a stream. message Message { int64 offset = 1 [jstype=JS_STRING]; // Monotonic message offset in the stream bytes key = 2; // Message key bytes value = 3; // Message payload int64 timestamp = 4 [jstype=JS_STRING]; // When the message was received by the broker string stream = 5; // Stream name message was received on int32 partition = 6; // Stream partition message was assigned to string subject = 7; // NATS subject message was received on string replySubject = 8; // NATS reply subject map headers = 9; // Message headers string ackInbox = 10; // NATS subject to publish acks to string correlationId = 11; // User-supplied value to correlate acks to publishes AckPolicy ackPolicy = 12; // Controls the behavior of acks } // Ack represents an acknowledgement that a message was committed to a stream // partition. message Ack { string stream = 1; // Name of the stream string partitionSubject = 2; // NATS subject partition is attached to string msgSubject = 3; // NATS subject the message was received on int64 offset = 4 [jstype=JS_STRING]; // Stream offset the message was committed to string ackInbox = 5; // NATS subject to publish acks to string correlationId = 6; // User-supplied value from the message AckPolicy ackPolicy = 7; // The AckPolicy sent on the message } // ActivityStreamOp represents an activity stream operation. enum ActivityStreamOp { CREATE_STREAM = 0; DELETE_STREAM = 1; PAUSE_STREAM = 2; RESUME_STREAM = 3; } // CreateStreamOp represents a stream creation operation. message CreateStreamOp { string stream = 1; repeated int32 partitions = 2; } // DeleteStreamOp represents a stream deletion operation. message DeleteStreamOp { string stream = 1; } // PauseStreamOp represents a stream pause operation. message PauseStreamOp { string stream = 1; repeated int32 partitions = 2; bool resumeAll = 3; } // ResumeStreamOp represents a stream resume operation. message ResumeStreamOp { string stream = 1; repeated int32 partitions = 2; } // ActivityStreamEvent represents an entry into the activity stream. message ActivityStreamEvent { uint64 id = 1; ActivityStreamOp op = 2; CreateStreamOp createStreamOp = 3; DeleteStreamOp deleteStreamOp = 4; PauseStreamOp pauseStreamOp = 5; ResumeStreamOp resumeStreamOp = 6; } // API is the main Liftbridge server interface clients interact with. service API { // CreateStream creates a new stream attached to a NATS subject. It returns // an AlreadyExists status code if a stream with the given subject and name // already exists. rpc CreateStream(CreateStreamRequest) returns (CreateStreamResponse) {} // DeleteStream deletes a stream. rpc DeleteStream(DeleteStreamRequest) returns (DeleteStreamResponse) {} // PauseStream pauses a stream's partitions. If no partitions are // specified, all of the stream's partitions will be paused. Partitions are // resumed when they are published to via the Liftbridge Publish API. rpc PauseStream(PauseStreamRequest) returns (PauseStreamResponse) {} // Subscribe creates an ephemeral subscription for the given stream. It // begins to receive messages starting at the given offset and waits for // new messages when it reaches the end of the stream. Use the request // context to close the subscription. rpc Subscribe(SubscribeRequest) returns (stream Message) {} // FetchMetadata retrieves the latest cluster metadata, including stream // broker information. rpc FetchMetadata(FetchMetadataRequest) returns (FetchMetadataResponse) {} // Publish a new message to a stream. If the AckPolicy is not NONE and a // deadline is provided, this will synchronously block until the ack is // received. If the ack is not received in time, a DeadlineExceeded status // code is returned. rpc Publish(PublishRequest) returns (PublishResponse) {} // Publish a Liftbridge message to a NATS subject. If the AckPolicy is not NONE and a // deadline is provided, this will synchronously block until the first ack // is received. If an ack is not received in time, a DeadlineExceeded // status code is returned. rpc PublishToSubject(PublishToSubjectRequest) returns (PublishToSubjectResponse) {} }