syntax = "proto3"; package ballista.protobuf; option java_multiple_files = true; option java_package = "org.ballistacompute.protobuf"; option java_outer_classname = "BallistaProto"; /////////////////////////////////////////////////////////////////////////////////////////////////// // Ballista Logical Plan /////////////////////////////////////////////////////////////////////////////////////////////////// // logical expressions message LogicalExprNode { // column references string column_name = 10; bool has_column_name = 11; // alias AliasNode alias = 14; // literals string literal_string = 21; bool has_literal_string = 22; int64 literal_int = 23; bool has_literal_i8 = 24; bool has_literal_i16 = 25; bool has_literal_i32 = 26; bool has_literal_i64 = 27; uint64 literal_uint = 28; bool has_literal_u8 = 29; bool has_literal_u16 = 30; bool has_literal_u32 = 31; bool has_literal_u64 = 32; float literal_f32 = 33; bool has_literal_f32 = 34; double literal_f64 = 35; bool has_literal_f64 = 36; // binary expressions BinaryExprNode binary_expr = 40; // aggregate expressions AggregateExprNode aggregate_expr = 50; } message AliasNode { LogicalExprNode expr = 1; string alias = 2; } message BinaryExprNode { LogicalExprNode l = 1; LogicalExprNode r = 2; string op = 3; } enum AggregateFunction { MIN = 0; MAX = 1; SUM = 2; AVG = 3; COUNT = 4; COUNT_DISTINCT = 5; } message AggregateExprNode { AggregateFunction aggr_function = 1; LogicalExprNode expr = 2; } // LogicalPlan is a nested type message LogicalPlanNode { LogicalPlanNode input = 1; ScanNode scan = 10; ProjectionNode projection = 20; SelectionNode selection = 21; LimitNode limit = 22; AggregateNode aggregate = 23; } message ProjectionColumns { repeated string columns = 1; } //TODO break this out into separate CsvScanNode and ParquetScanNode message ScanNode { string path = 1; ProjectionColumns projection = 2; Schema schema = 3; string file_format = 4; // parquet or csv bool has_header = 5; // csv specific } message ProjectionNode { repeated LogicalExprNode expr = 1; } message SelectionNode { LogicalExprNode expr = 2; } message AggregateNode { repeated LogicalExprNode group_expr = 1; repeated LogicalExprNode aggr_expr = 2; } message LimitNode { uint32 limit = 1; } /////////////////////////////////////////////////////////////////////////////////////////////////// // Ballista Physical Plan /////////////////////////////////////////////////////////////////////////////////////////////////// // PhysicalPlanNode is a nested type message PhysicalPlanNode { PhysicalPlanNode input = 1; ScanExecNode scan = 10; ProjectionExecNode projection = 20; SelectionExecNode selection = 21; GlobalLimitExecNode global_limit = 22; LocalLimitExecNode local_limit = 23; HashAggregateExecNode hash_aggregate = 30; ShuffleReaderExecNode shuffle_reader = 40; } message ScanExecNode { string path = 1; repeated uint32 projection = 2; Schema schema = 3; string file_format = 4; // parquet or csv bool has_header = 5; // csv specific uint32 batch_size = 6; // partition filenames repeated string filename = 8; } message ProjectionExecNode { repeated LogicalExprNode expr = 1; } message SelectionExecNode { LogicalExprNode expr = 2; } enum AggregateMode { PARTIAL = 0; FINAL = 1; COMPLETE = 2; } message HashAggregateExecNode { repeated LogicalExprNode group_expr = 1; repeated LogicalExprNode aggr_expr = 2; AggregateMode mode = 3; } message ShuffleReaderExecNode { repeated ShuffleId shuffle_id = 1; Schema schema = 2; } message GlobalLimitExecNode { uint32 limit = 1; } message LocalLimitExecNode { uint32 limit = 1; } /////////////////////////////////////////////////////////////////////////////////////////////////// // Ballista Scheduling /////////////////////////////////////////////////////////////////////////////////////////////////// message KeyValuePair { string key = 1; string value = 2; } message Action { // interactive query LogicalPlanNode query = 1; // Execute query and store resulting shuffle partition in memory Task task = 2; // Fetch a shuffle partition from an executor ShuffleId fetch_shuffle = 3; // configuration settings repeated KeyValuePair settings = 100; } message Task { string job_uuid = 1; uint32 stage_id = 2; uint32 task_id = 3; uint32 partition_id = 4; PhysicalPlanNode plan = 5; // The task could need to read shuffle output from another task repeated ShuffleLocation shuffle_loc = 6; } // Mapping from shuffle id to executor id message ShuffleLocation { string job_uuid = 1; uint32 stage_id = 2; uint32 partition_id = 4; string executor_id = 5; string executor_host = 6; uint32 executor_port = 7; } // Mapping from shuffle id to executor id message ShuffleId { string job_uuid = 1; uint32 stage_id = 2; uint32 partition_id = 4; } /////////////////////////////////////////////////////////////////////////////////////////////////// // Arrow Data Types /////////////////////////////////////////////////////////////////////////////////////////////////// message Schema { repeated Field columns = 1; } message Field { // name of the field string name = 1; ArrowType arrow_type = 2; bool nullable = 3; // for complex data types like structs, unions repeated Field children = 4; } // copied from GandivaType from Apache Arrow project enum ArrowType { NONE = 0; // arrow::Type::NA BOOL = 1; // arrow::Type::BOOL UINT8 = 2; // arrow::Type::UINT8 INT8 = 3; // arrow::Type::INT8 UINT16 = 4; // represents arrow::Type fields in src/arrow/type.h INT16 = 5; UINT32 = 6; INT32 = 7; UINT64 = 8; INT64 = 9; HALF_FLOAT = 10; FLOAT = 11; DOUBLE = 12; UTF8 = 13; BINARY = 14; FIXED_SIZE_BINARY = 15; DATE32 = 16; DATE64 = 17; TIMESTAMP = 18; TIME32 = 19; TIME64 = 20; INTERVAL = 21; DECIMAL = 22; LIST = 23; STRUCT = 24; UNION = 25; DICTIONARY = 26; MAP = 27; }