// --------------------------------------------------------------------------- // HStreamDB public APIs // --------------------------------------------------------------------------- syntax = "proto3"; package hstream.server; import "google/protobuf/struct.proto"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; option java_package = "io.hstream.internal"; option java_multiple_files = true; option java_outer_classname = "HStreamProto"; option go_package = "hstreamdb/hstream/server"; service HStreamApi { // --------------------------------------------------------------------------- // Stable APIs // --------------------------------------------------------------------------- rpc Echo(EchoRequest) returns (EchoResponse); // Stream APIs rpc CreateStream(Stream) returns (Stream) {} rpc DeleteStream(DeleteStreamRequest) returns (google.protobuf.Empty) {} rpc GetStream(GetStreamRequest) returns (GetStreamResponse) {} rpc ListStreams(ListStreamsRequest) returns (ListStreamsResponse) {} rpc ListStreamsWithPrefix(ListStreamsWithPrefixRequest) returns (ListStreamsResponse) {} rpc LookupShard(LookupShardRequest) returns (LookupShardResponse) {} rpc Append(AppendRequest) returns (AppendResponse) {} // Shard APIs rpc ListShards(ListShardsRequest) returns (ListShardsResponse) {} rpc CreateShardReader(CreateShardReaderRequest) returns (CreateShardReaderResponse) {} rpc LookupShardReader(LookupShardReaderRequest) returns (LookupShardReaderResponse) {} rpc ReadShard(ReadShardRequest) returns (ReadShardResponse) {} rpc ListShardReaders(ListShardReadersRequest) returns (ListShardReadersResponse) {} rpc DeleteShardReader(DeleteShardReaderRequest) returns (google.protobuf.Empty) {} // Subscribe APIs rpc CreateSubscription(Subscription) returns (Subscription) {} rpc GetSubscription(GetSubscriptionRequest) returns (GetSubscriptionResponse) {} rpc ListSubscriptions(ListSubscriptionsRequest) returns (ListSubscriptionsResponse) {} rpc ListSubscriptionsWithPrefix(ListSubscriptionsWithPrefixRequest) returns (ListSubscriptionsResponse) {} rpc ListConsumers(ListConsumersRequest) returns (ListConsumersResponse) {} rpc CheckSubscriptionExist(CheckSubscriptionExistRequest) returns (CheckSubscriptionExistResponse) {} rpc DeleteSubscription(DeleteSubscriptionRequest) returns (google.protobuf.Empty) {} rpc LookupSubscription(LookupSubscriptionRequest) returns (LookupSubscriptionResponse) {} rpc StreamingFetch(stream StreamingFetchRequest) returns (stream StreamingFetchResponse) {} // Cluster APIs rpc DescribeCluster(google.protobuf.Empty) returns (DescribeClusterResponse) { } rpc LookupResource(LookupResourceRequest) returns (ServerNode) {} // Admin Command rpc SendAdminCommand(AdminCommandRequest) returns (AdminCommandResponse); // --------------------------------------------------------------------------- // Developing APIs // --------------------------------------------------------------------------- // Stats rpc PerStreamTimeSeriesStats(PerStreamTimeSeriesStatsRequest) returns (PerStreamTimeSeriesStatsResponse); rpc PerStreamTimeSeriesStatsAll(PerStreamTimeSeriesStatsAllRequest) returns (PerStreamTimeSeriesStatsAllResponse); rpc GetStreamStats(GetStreamStatsRequest) returns (GetStreamStatsResponse); rpc GetSubscriptionStats(GetSubscriptionStatsRequest) returns (GetSubscriptionStatsResponse); // only for push query // e.g., select (with emit changes) rpc ExecutePushQuery(CommandPushQuery) returns (stream google.protobuf.Struct); // for execute any sql stmt except push query, // e.g., insert, create, show/list, select(without emit changes) ... rpc ExecuteQuery(CommandQuery) returns (CommandQueryResponse); // query related apis rpc CreateQuery(CreateQueryRequest) returns (Query) {} rpc ListQueries(ListQueriesRequest) returns (ListQueriesResponse) {} rpc GetQuery(GetQueryRequest) returns (Query) {} rpc TerminateQueries(TerminateQueriesRequest) returns (TerminateQueriesResponse) {} rpc DeleteQuery(DeleteQueryRequest) returns (google.protobuf.Empty) {} rpc RestartQuery(RestartQueryRequest) returns (google.protobuf.Empty) {} // connector related apis rpc CreateConnector(CreateConnectorRequest) returns (Connector) {} rpc ListConnectors(ListConnectorsRequest) returns (ListConnectorsResponse) {} rpc GetConnector(GetConnectorRequest) returns (Connector) {} rpc DeleteConnector(DeleteConnectorRequest) returns (google.protobuf.Empty) {} rpc PauseConnector(PauseConnectorRequest) returns (google.protobuf.Empty) {} rpc ResumeConnector(ResumeConnectorRequest) returns (google.protobuf.Empty) {} rpc LookupConnector(LookupConnectorRequest) returns (LookupConnectorResponse) {} // view related apis rpc ListViews(ListViewsRequest) returns (ListViewsResponse) {} rpc GetView(GetViewRequest) returns (View) {} rpc DeleteView(DeleteViewRequest) returns (google.protobuf.Empty) {} // overvew related apis // rpc GetOverview(GetOverviewRequest) returns (GetOverviewResponse) {} // FIXME: (store)node related apis rpc ListNodes(ListNodesRequest) returns (ListNodesResponse) {} rpc GetNode(GetNodeRequest) returns (Node) {} } enum SpecialOffset { EARLIEST = 0; LATEST = 1; } message ShardOffset { oneof offset { SpecialOffset specialOffset = 1; RecordId recordOffset = 2; } } message EchoRequest { string msg = 1; } message EchoResponse { string msg = 1; } message CommandStreamTask { string command_sql = 1; } message CommandStreamTaskResponse { string command_resp = 1; } message CommandConnect { string client_version = 1; int32 protocol_version = 2; } message CommandConnected { string server_version = 1; int32 protocol_version = 2; } message CommandPushQuery { string query_text = 1; } message CommandQuery { string stmt_text = 1; } message CommandQueryResponse { repeated google.protobuf.Struct result_set = 1; } message GetStreamRequest { string name = 1; } message GetStreamResponse { Stream stream = 1; } message AppendRequest { string streamName = 1; uint64 shardId = 2; BatchedRecord records = 3; } message AppendResponse { string streamName = 1; uint64 shardId = 2; repeated RecordId recordIds = 3; } message Subscription { string subscriptionId = 1; string streamName = 2; int32 ackTimeoutSeconds = 3; int32 maxUnackedRecords = 4; SpecialOffset offset = 5; google.protobuf.Timestamp creationTime = 6; } message SubscriptionOffset { uint64 shardId = 1; uint64 batchId = 2; } message GetSubscriptionRequest { string id = 1; } message GetSubscriptionResponse { Subscription subscription = 1; repeated SubscriptionOffset offsets = 2; } message DeleteSubscriptionRequest { string subscriptionId = 1; bool force = 2; } message CheckSubscriptionExistRequest { string subscriptionId = 1; } message CheckSubscriptionExistResponse { bool exists = 1; } message StreamingFetchRequest { string subscriptionId = 1; string consumerName = 2; repeated RecordId ackIds = 3; } message StreamingFetchResponse { ReceivedRecord receivedRecords = 1; } message ReceivedRecord { repeated RecordId recordIds = 1; BatchedRecord record = 2; } message DeleteStreamRequest { string streamName = 1; bool ignoreNonExist = 2; bool force = 3; } message ListStreamsResponse { repeated Stream streams = 1; } // TODO: Add options for filtering or max_size message ListStreamsRequest {} message ListStreamsWithPrefixRequest { string prefix = 1; } message ListSubscriptionsRequest {} message ListSubscriptionsWithPrefixRequest { string prefix = 1; } message ListSubscriptionsResponse { repeated Subscription subscription = 1; } message ListConsumersRequest { string subscriptionId = 1; } message ListConsumersResponse { repeated Consumer consumers = 1; } message Stream { string streamName = 1; uint32 replicationFactor = 2; uint32 backlogDuration = 3; uint32 shardCount = 4; google.protobuf.Timestamp creationTime = 5; } enum CompressionType { None = 0; Gzip = 1; Zstd = 2; } message BatchedRecord { CompressionType compressionType = 1; // Required. // The time at which the message was published, // populated by the server. google.protobuf.Timestamp publishTime = 2; // Required // batchSize records the number of compressed HStreamRecords // in the current batch. This field allows the server to build // the RecordId correctly without decompression after reading // the data from the store. uint32 batchSize = 3; bytes payload = 4; // serialized BatchHStreamRecords } message HStreamRecord { // Required. HStreamRecordHeader header = 1; // Optional. // Payload may be empty. bytes payload = 2; } // Auxiliary types for serialization [HStreamRecord] message BatchHStreamRecords { repeated HStreamRecord records = 1; } message HStreamRecordHeader { // Required. // Flag for payload. enum Flag { JSON = 0; RAW = 1; } Flag flag = 1; // Optional. // Attributes attached to this record. map attributes = 2; // Optional. // key for the message. string key = 3; } message RecordId { uint64 shardId = 1; // LSN of the total batch records uint64 batchId = 2; // Offset of each record in the batch uint32 batchIndex = 3; } message Shard { string streamName = 1; uint64 shardId = 2; string startHashRangeKey = 3; string endHashRangeKey = 4; uint64 epoch = 5; bool isActive = 6; } message ListShardsRequest { string streamName = 1; } message ListShardsResponse { repeated Shard shards = 1; } message CreateShardReaderRequest { string streamName = 1; uint64 shardId = 2; ShardOffset shardOffset = 3; string readerId = 4; uint32 timeout = 5; } message CreateShardReaderResponse { string streamName = 1; uint64 shardId = 2; ShardOffset shardOffset = 3; string readerId = 4; uint32 timeout = 5; } message ReadShardRequest { string readerId = 1; uint32 maxRecords = 2; } message DeleteShardReaderRequest { string readerId = 1; } message ReadShardResponse { repeated ReceivedRecord receivedRecords = 1; } message ListShardReadersRequest {} message ListShardReadersResponse { repeated string readerId = 1; } message TerminateQueriesRequest { repeated string queryId = 1; bool all = 2; } message TerminateQueriesResponse { repeated string queryId = 1; } // Query realated Request and Response message CreateQueryRequest { string sql = 1; } message ListQueriesRequest {} message ListQueriesResponse { repeated Query queries = 1; } message GetQueryRequest { string id = 1; } enum TaskStatusPB { TASK_CREATING = 0; TASK_CREATED = 1; TASK_RUNNING = 2; TASK_CREATION_ABORT = 3; TASK_ABORT = 4; TASK_TERMINATED = 5; TASK_UNKNOWN = 6; } message Consumer { string name = 1; string uri = 2; string userAgent = 3; } message Query { string id = 1; TaskStatusPB status = 2; int64 createdTime = 3; string queryText = 4; } message DeleteQueryRequest { string id = 1; } message DeleteQueryResponse { bool success = 1; } message RestartQueryRequest { string id = 1; } message RestartQueryResponse { bool success = 1; } // Connector Related Request and Response message CreateConnectorRequest { string sql = 1; } message ListConnectorsRequest {} message ListConnectorsResponse { repeated Connector connectors = 1; } message GetConnectorRequest { string name = 1; } message Connector { google.protobuf.Struct info = 1; } message DeleteConnectorRequest { string name = 1; } message PauseConnectorRequest { string name = 1; } message ResumeConnectorRequest { string name = 1; } // View related Request and Response message GetViewRequest { string viewId = 1; } message DeleteViewRequest { string viewId = 1; bool ignoreNonExist = 2; } message ListViewsRequest {} message ListViewsResponse { repeated View views = 1; } message View { string viewId = 1; TaskStatusPB status = 2; int64 createdTime = 3; string sql = 4; repeated string schema = 5; } // Nodes related Request and Response message GetNodeRequest { int32 id = 1; } message ListNodesRequest {} message ListNodesResponse { repeated Node nodes = 1; } message Node { int32 id = 1; repeated int32 roles = 2; string address = 3; string status = 4; } message StatsIntervalVals { repeated int32 intervals = 1; } message StatsDoubleVals { repeated double vals = 1; } message AdminCommandRequest { string command = 1; } message AdminCommandResponse { string result = 1; } message PerStreamTimeSeriesStatsRequest { string method = 1; string streamName = 2; StatsIntervalVals intervals = 3; } message PerStreamTimeSeriesStatsResponse { StatsDoubleVals stats = 1; } message PerStreamTimeSeriesStatsAllResponse { map stats = 1; } message PerStreamTimeSeriesStatsAllRequest { string method = 1; StatsIntervalVals intervals = 2; } // --------------------------------------------------------------------------- // Messages for Cluster // --------------------------------------------------------------------------- message DescribeClusterResponse { string protocolVersion = 1; string serverVersion = 2; repeated ServerNode serverNodes = 3; repeated ServerNodeStatus serverNodesStatus = 4; uint64 clusterUpTime = 5; } message ServerNode { uint32 id = 1; string host = 2; uint32 port = 3; } enum NodeState { Starting = 0; Running = 1; Unavailable = 2; Dead = 3; } message ServerNodeStatus { ServerNode node = 1; NodeState state = 2; } message LookupShardRequest { uint64 shardId = 1; } message LookupShardResponse { uint64 shardId = 1; ServerNode serverNode = 2; } message LookupSubscriptionRequest { string subscriptionId = 1; } message LookupSubscriptionResponse { string subscriptionId = 1; ServerNode serverNode = 2; } message LookupConnectorRequest { string name = 1; } message LookupConnectorResponse { string name = 1; ServerNode serverNode = 2; } message LookupShardReaderRequest { string readerId = 1; } message LookupShardReaderResponse { string readerId = 1; ServerNode serverNode = 2; } enum ResourceType { ResStream = 0; ResSubscription = 1; ResShard = 2; ResShardReader = 3; ResConnector = 4; ResQuery = 5; ResView = 6; } message LookupResourceRequest { ResourceType resType = 1; string resId = 2; } enum StreamStats { AppendInBytes = 0; AppendInRecords = 1; TotalAppend = 2; FailedAppend = 3; } message GetStreamStatsRequest { StreamStats statsType = 1; } message GetStreamStatsResponse { map statValues = 1; } enum SubscriptionStats { DeliveryInBytes = 0; DeliveryInRecords = 1; AckReceived = 2; ResendRecords = 3; MessageRequestCount = 4; MessageResponseCount = 5; } message GetSubscriptionStatsRequest { SubscriptionStats statsType = 1; } message GetSubscriptionStatsResponse { map statValues = 1; }