syntax = "proto3"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; package source.v1; service Source { // Read returns a stream of datum responses. // The size of the returned responses is less than or equal to the num_records specified in each ReadRequest. // If the request timeout is reached on the server side, the returned responses will contain all the datum that have been read (which could be an empty list). // The server will continue to read and respond to subsequent ReadRequests until the client closes the stream. // Once it has sent all the datum, the server will send a ReadResponse with the end of transmission flag set to true. rpc ReadFn(stream ReadRequest) returns (stream ReadResponse); // AckFn acknowledges a stream of datum offsets. // When AckFn is called, it implicitly indicates that the datum stream has been processed by the source vertex. // The caller (numa) expects the AckFn to be successful, and it does not expect any errors. // If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request, // then it is best to crash because there are no other retry mechanisms possible. // Clients sends n requests and expects n responses. rpc AckFn(stream AckRequest) returns (stream AckResponse); // PendingFn returns the number of pending records at the user defined source. rpc PendingFn(google.protobuf.Empty) returns (PendingResponse); // PartitionsFn returns the list of partitions for the user defined source. rpc PartitionsFn(google.protobuf.Empty) returns (PartitionsResponse); // IsReady is the heartbeat endpoint for user defined source gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); } /* * Handshake message between client and server to indicate the start of transmission. */ message Handshake { // Required field indicating the start of transmission. bool sot = 1; } /* * ReadRequest is the request for reading datum stream from user defined source. */ message ReadRequest { message Request { // Required field indicating the number of records to read. uint64 num_records = 1; // Required field indicating the request timeout in milliseconds. // uint32 can represent 2^32 milliseconds, which is about 49 days. // We don't use uint64 because time.Duration takes int64 as nano seconds. Using uint64 for milli will cause overflow. uint32 timeout_in_ms = 2; } // Required field indicating the request. Request request = 1; optional Handshake handshake = 2; } /* * ReadResponse is the response for reading datum stream from user defined source. */ message ReadResponse { message Result { // Required field holding the payload of the datum. bytes payload = 1; // Required field indicating the offset information of the datum. Offset offset = 2; // Required field representing the time associated with each datum. It is used for watermarking. google.protobuf.Timestamp event_time = 3; // Optional list of keys associated with the datum. // Key is the "key" attribute in (key,value) as in the map-reduce paradigm. // We add this optional field to support the use case where the user defined source can provide keys for the datum. // e.g. Kafka and Redis Stream message usually include information about the keys. repeated string keys = 4; // Optional list of headers associated with the datum. // Headers are the metadata associated with the datum. // e.g. Kafka and Redis Stream message usually include information about the headers. map headers = 5; } message Status { // Code to indicate the status of the response. enum Code { SUCCESS = 0; FAILURE = 1; } // Error to indicate the error type. If the code is FAILURE, then the error field will be populated. enum Error { UNACKED = 0; OTHER = 1; } // End of transmission flag. bool eot = 1; Code code = 2; optional Error error = 3; optional string msg = 4; } // Required field holding the result. Result result = 1; // Status of the response. Holds the end of transmission flag and the status code. Status status = 2; // Handshake message between client and server to indicate the start of transmission. optional Handshake handshake = 3; } /* * AckRequest is the request for acknowledging datum. * It takes a list of offsets to be acknowledged. */ message AckRequest { message Request { // Required field holding the offset to be acked repeated Offset offsets = 1; } // Required field holding the request. The list will be ordered and will have the same order as the original Read response. Request request = 1; optional Handshake handshake = 2; } /* * AckResponse is the response for acknowledging datum. It contains one empty field confirming * the batch of offsets that have been successfully acknowledged. The contract between client and server * is that the server will only return the AckResponse if the ack request is successful. * If the server hangs during the ack request, the client can decide to timeout and error out the data forwarder. * The reason why we define such contract is that we always expect the server to be able to process the ack request. * Client is expected to send the AckRequest to the server with offsets that are strictly * corresponding to the previously read batch. If the client sends the AckRequest with offsets that are not, * it is considered as a client error and the server will not return the AckResponse. */ message AckResponse { message Result { // Required field indicating the ack request is successful. google.protobuf.Empty success = 1; } // Required field holding the result. Result result = 1; // Handshake message between client and server to indicate the start of transmission. optional Handshake handshake = 2; } /* * ReadyResponse is the health check result for user defined source. */ message ReadyResponse { // Required field holding the health check result. bool ready = 1; } /* * PendingResponse is the response for the pending request. */ message PendingResponse { message Result { // Required field holding the number of pending records at the user defined source. // A negative count indicates that the pending information is not available. int64 count = 1; } // Required field holding the result. Result result = 1; } /* * PartitionsResponse is the response for the partitions request. */ message PartitionsResponse { message Result { // Required field holding the list of partitions. repeated int32 partitions = 1; } // Required field holding the result. Result result = 1; } /* * Offset is the offset of the datum. */ message Offset { // offset is the offset of the datum. This field is required. // We define Offset as a byte array because different input data sources can have different representations for Offset. // The only way to generalize it is to define it as a byte array, // Such that we can let the UDSource to de-serialize the offset using its own interpretation logics. bytes offset = 1; // Optional partition_id indicates which partition of the source the datum belongs to. // It is useful for sources that have multiple partitions. e.g. Kafka. // If the partition_id is not specified, it is assumed that the source has a single partition. int32 partition_id = 2; }