syntax = "proto3"; package oomagent; option go_package = "/codegen"; service OomAgent { // Get online features for an entity. rpc OnlineGet(OnlineGetRequest) returns (OnlineGetResponse) {} // Get online features for multiple entities. rpc OnlineMultiGet(OnlineMultiGetRequest) returns (OnlineMultiGetResponse) {} // Sync a certain revision of batch features from offline to online store. rpc Sync(SyncRequest) returns (SyncResponse) {} // Import features from external (batch and stream) data sources to offline store through channels. rpc ChannelImport(stream ChannelImportRequest) returns (ImportResponse) {} // Import features from external (batch and stream) data sources to offline store through files. rpc Import(ImportRequest) returns (ImportResponse) {} // Push stream features from stream data source to both offline and online stores. rpc Push(PushRequest) returns (PushResponse) {} // Take snapshot for a stream feature group in offline store. rpc Snapshot(SnapshotRequest) returns (SnapshotResponse) {} // Point-in-Time Join features against labeled entity rows through channels. rpc ChannelJoin(stream ChannelJoinRequest) returns (stream ChannelJoinResponse) {} // Point-in-Time Join features against labeled entity rows through files. rpc Join(JoinRequest) returns (JoinResponse) {} // Export certain features to a channel. rpc ChannelExport(ChannelExportRequest) returns (stream ChannelExportResponse) {} // Export certain features to a file. rpc Export(ExportRequest) returns (ExportResponse) {} // Check if oomagent is ready to serve requests. rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse) {} } // `Value` represents a dynamically typed value which can be either // a int64, a double, a string, a bool, a unix milliseconds, or a // bytes. A producer of value is expected to set one of these // variants. message Value { oneof value { // Represents a int64 value. int64 int64 = 1; // Represents a double value. double double = 2; // Represents a string value. string string = 3; // Represents a boolean value. bool bool = 4; // Represents a unix milliseconds value. int64 unix_milli = 5; // Represents a bytes value. bytes bytes = 6; } } // `OnlineGetRequest` is a request for `OnlineGet` API. message OnlineGetRequest { // An entity identifier, could be device ID, user ID, etc. string entity_key = 1; // A list of feature full names. // A feature full name has the following format: ., // for example, txn_stats.count_7d repeated string features = 2; // All features of the group will be acquired if not null. Conflict with `features` argument. optional string group = 3; } // `FeatureValueMap` maps feature full name to feature value. message FeatureValueMap { map map = 1; } // `OnlineGetRequest` is a response for `OnlineGet` API. message OnlineGetResponse { // The result of `OnlineGet`. FeatureValueMap result = 1; } // `OnlineMultiGetRequest` is a request for `OnlineMultiGet` API. message OnlineMultiGetRequest { // A list of entity identifiers, could be device IDs, user IDs, etc. repeated string entity_keys = 1; // A list of feature full names. // A feature full name has the following format: ., // for example, txn_stats.count_7d repeated string features = 2; } // `OnlineMultiGetResponse` is a response for `OnlineMultiGet` API. message OnlineMultiGetResponse { // The result of `OnlineMultiGet`, mapping entity key to FeatureValueMap. map result = 1; } // `SyncRequest` is a request for `Sync` API. message SyncRequest { // The group to sync from offline store to online store. string group = 1; // The revision to sync, it only applies to batch feature. // For batch feature: if null, will sync the latest revision; // otherwise, sync the designated revision. // For streaming feature, revision ID is not required, will always // sync the latest values. optional int32 revision_id = 2; // PurgeDelay represents the seconds to sleep before purging // the previous revision in online store. // It only applies to batch feature group. int32 purge_delay = 3; } // `SyncResponse` is a response for `Sync` API. message SyncResponse {} // `ChannelImportRequest` is a request for `ChannelImport` API. message ChannelImportRequest { // The group to be imported from data source to offline store. // It only takes effect (and required) on the first request. optional string group = 1; // The revision of the imported data, it only applies to // batch feature (not required). // For batch features, if revision is null, will use the // timestamp when it starts serving in online store; otherwise, // use the designated revision. // It only takes effect on the first request. optional int64 revision = 2; // Description of this import. // It only takes effect on the first request. optional string description = 3; // A single row of channel import. bytes row = 4; } // `ImportResponse` is a response for `ChannelImport` and `Import` API. message ImportResponse { // The revision ID of this import, it only applies to batch feature. int32 revision_id = 1; } // `ImportRequest` is a request for `Import` API. message ImportRequest { // The group to be imported from data source to offline store. string group = 1; // The revision of the imported data, it only applies to // batch feature (not required). // For batch features, if revision is null, will use the // timestamp when it starts serving in online store; otherwise, // use the designated revision. optional int64 revision = 2; // Description of this import. optional string description = 3; // The path of data source. string input_file = 4; // Delimiter of data source optional string delimiter = 5; } // `PushRequest` is a request for `Push` API. message PushRequest { // An entity identifier. string entity_key = 1; // A streaming feature group. string group = 2; // Feature values maps feature name to feature value. map feature_values = 3; } // `PushResponse` is a response for `Push` API. message PushResponse {} // `SnapshotRequest` is a request for `Snapshot` API. message SnapshotRequest { // A streaming feature group. string group = 1; } // `SnapshotResponse` is a response for `Snapshot` API. message SnapshotResponse {} // `EntityRow` represents a row in `Join` request. message EntityRow { // An entity identifier, could represents device ID, user ID, etc. string entity_key = 1; // A unix milliseconds, represents the record timestamp. int64 unix_milli = 2; // A list of existing values, could be label or real-time features. repeated string values = 3; } // `ChannelJoinRequest` is a request for `ChannelJoin` API. message ChannelJoinRequest { // A list of feature full names, their feature values will be // joined and fetched from offline store. // It only takes effect (and required) on the first request. repeated string join_features = 1; // A list of names, could be label name or real-time feature names. // Those feature values will be passed from entity row. // It only takes effect (and required) on the first request. repeated string existed_features = 2; // An entity row. EntityRow entity_row = 3; } // `ChannelJoinResponse` is a response for `ChannelJoin` API. message ChannelJoinResponse { // Header of the `Join` response. // It only appears in the first request. repeated string header = 1; // A single row of joined results. repeated Value joined_row = 2; } // `JoinRequest` is a request for `Join` API. message JoinRequest { // A list of feature full names, their feature values will be // joined and fetched from offline store. repeated string features = 1; // File path of entity rows. string input_file = 2; // File path of joined result. string output_file = 3; } // `JoinResponse` is a response for `Join` API. message JoinResponse {} // `ChannelExportRequest` is a request for `ChannelExport` API. message ChannelExportRequest { // A list of feature full names. repeated string features = 1; // A unix milliseconds, export the feature value before this timestamp. int64 unix_milli = 2; // Limit the size of export data. optional uint64 limit = 3; } // `ChannelExportResponse` is a response for `ChannelExport` API. message ChannelExportResponse { // Header of export result. // It only appears in the first request. repeated string header = 1; // A single row of export result. repeated Value row = 2; } // `ExportRequest` is a request for `Export` API. message ExportRequest { // A list of feature full names. repeated string features = 1; // A unix milliseconds, export the feature value before this timestamp. int64 unix_milli = 2; // File path of export result. string output_file = 3; // Limit the size of export data. optional uint64 limit = 4; } // `ExportResponse` is a response for `Export` API. message ExportResponse {} // `HealthCheckRequesst` is a request for `HealthCheck` API. message HealthCheckRequest {} // `HealthCheckResponse` is a response for `HealthCheck` API. message HealthCheckResponse {}