/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ syntax = 'proto3'; package spark.connect; import "google/protobuf/any.proto"; import "spark/connect/expressions.proto"; import "spark/connect/types.proto"; import "spark/connect/catalog.proto"; option java_multiple_files = true; option java_package = "org.apache.spark.connect.proto"; option go_package = "internal/generated"; // The main [[Relation]] type. Fundamentally, a relation is a typed container // that has exactly one explicit relation type set. // // When adding new relation types, they have to be registered here. message Relation { RelationCommon common = 1; oneof rel_type { Read read = 2; Project project = 3; Filter filter = 4; Join join = 5; SetOperation set_op = 6; Sort sort = 7; Limit limit = 8; Aggregate aggregate = 9; SQL sql = 10; LocalRelation local_relation = 11; Sample sample = 12; Offset offset = 13; Deduplicate deduplicate = 14; Range range = 15; SubqueryAlias subquery_alias = 16; Repartition repartition = 17; ToDF to_df = 18; WithColumnsRenamed with_columns_renamed = 19; ShowString show_string = 20; Drop drop = 21; Tail tail = 22; WithColumns with_columns = 23; Hint hint = 24; Unpivot unpivot = 25; ToSchema to_schema = 26; RepartitionByExpression repartition_by_expression = 27; MapPartitions map_partitions = 28; CollectMetrics collect_metrics = 29; Parse parse = 30; GroupMap group_map = 31; CoGroupMap co_group_map = 32; WithWatermark with_watermark = 33; ApplyInPandasWithState apply_in_pandas_with_state = 34; HtmlString html_string = 35; CachedLocalRelation cached_local_relation = 36; CachedRemoteRelation cached_remote_relation = 37; CommonInlineUserDefinedTableFunction common_inline_user_defined_table_function = 38; // NA functions NAFill fill_na = 90; NADrop drop_na = 91; NAReplace replace = 92; // stat functions StatSummary summary = 100; StatCrosstab crosstab = 101; StatDescribe describe = 102; StatCov cov = 103; StatCorr corr = 104; StatApproxQuantile approx_quantile = 105; StatFreqItems freq_items = 106; StatSampleBy sample_by = 107; // Catalog API (experimental / unstable) Catalog catalog = 200; // This field is used to mark extensions to the protocol. When plugins generate arbitrary // relations they can add them here. During the planning the correct resolution is done. google.protobuf.Any extension = 998; Unknown unknown = 999; } } // Used for testing purposes only. message Unknown {} // Common metadata of all relations. message RelationCommon { // (Required) Shared relation metadata. string source_info = 1; // (Optional) A per-client globally unique id for a given connect plan. optional int64 plan_id = 2; } // Relation that uses a SQL query to generate the output. message SQL { // (Required) The SQL query. string query = 1; // (Optional) A map of parameter names to literal expressions. map args = 2; // (Optional) A sequence of literal expressions for positional parameters in the SQL query text. repeated Expression.Literal pos_args = 3; } // Relation that reads from a file / table or other data source. Does not have additional // inputs. message Read { oneof read_type { NamedTable named_table = 1; DataSource data_source = 2; } // (Optional) Indicates if this is a streaming read. bool is_streaming = 3; message NamedTable { // (Required) Unparsed identifier for the table. string unparsed_identifier = 1; // Options for the named table. The map key is case insensitive. map options = 2; } message DataSource { // (Optional) Supported formats include: parquet, orc, text, json, parquet, csv, avro. // // If not set, the value from SQL conf 'spark.sql.sources.default' will be used. optional string format = 1; // (Optional) If not set, Spark will infer the schema. // // This schema string should be either DDL-formatted or JSON-formatted. optional string schema = 2; // Options for the data source. The context of this map varies based on the // data source format. This options could be empty for valid data source format. // The map key is case insensitive. map options = 3; // (Optional) A list of path for file-system backed data sources. repeated string paths = 4; // (Optional) Condition in the where clause for each partition. // // This is only supported by the JDBC data source. repeated string predicates = 5; } } // Projection of a bag of expressions for a given input relation. // // The input relation must be specified. // The projected expression can be an arbitrary expression. message Project { // (Optional) Input relation is optional for Project. // // For example, `SELECT ABS(-1)` is valid plan without an input plan. Relation input = 1; // (Required) A Project requires at least one expression. repeated Expression expressions = 3; } // Relation that applies a boolean expression `condition` on each row of `input` to produce // the output result. message Filter { // (Required) Input relation for a Filter. Relation input = 1; // (Required) A Filter must have a condition expression. Expression condition = 2; } // Relation of type [[Join]]. // // `left` and `right` must be present. message Join { // (Required) Left input relation for a Join. Relation left = 1; // (Required) Right input relation for a Join. Relation right = 2; // (Optional) The join condition. Could be unset when `using_columns` is utilized. // // This field does not co-exist with using_columns. Expression join_condition = 3; // (Required) The join type. JoinType join_type = 4; // Optional. using_columns provides a list of columns that should present on both sides of // the join inputs that this Join will join on. For example A JOIN B USING col_name is // equivalent to A JOIN B on A.col_name = B.col_name. // // This field does not co-exist with join_condition. repeated string using_columns = 5; enum JoinType { JOIN_TYPE_UNSPECIFIED = 0; JOIN_TYPE_INNER = 1; JOIN_TYPE_FULL_OUTER = 2; JOIN_TYPE_LEFT_OUTER = 3; JOIN_TYPE_RIGHT_OUTER = 4; JOIN_TYPE_LEFT_ANTI = 5; JOIN_TYPE_LEFT_SEMI = 6; JOIN_TYPE_CROSS = 7; } // (Optional) Only used by joinWith. Set the left and right join data types. optional JoinDataType join_data_type = 6; message JoinDataType { // If the left data type is a struct. bool is_left_struct = 1; // If the right data type is a struct. bool is_right_struct = 2; } } // Relation of type [[SetOperation]] message SetOperation { // (Required) Left input relation for a Set operation. Relation left_input = 1; // (Required) Right input relation for a Set operation. Relation right_input = 2; // (Required) The Set operation type. SetOpType set_op_type = 3; // (Optional) If to remove duplicate rows. // // True to preserve all results. // False to remove duplicate rows. optional bool is_all = 4; // (Optional) If to perform the Set operation based on name resolution. // // Only UNION supports this option. optional bool by_name = 5; // (Optional) If to perform the Set operation and allow missing columns. // // Only UNION supports this option. optional bool allow_missing_columns = 6; enum SetOpType { SET_OP_TYPE_UNSPECIFIED = 0; SET_OP_TYPE_INTERSECT = 1; SET_OP_TYPE_UNION = 2; SET_OP_TYPE_EXCEPT = 3; } } // Relation of type [[Limit]] that is used to `limit` rows from the input relation. message Limit { // (Required) Input relation for a Limit. Relation input = 1; // (Required) the limit. int32 limit = 2; } // Relation of type [[Offset]] that is used to read rows staring from the `offset` on // the input relation. message Offset { // (Required) Input relation for an Offset. Relation input = 1; // (Required) the limit. int32 offset = 2; } // Relation of type [[Tail]] that is used to fetch `limit` rows from the last of the input relation. message Tail { // (Required) Input relation for an Tail. Relation input = 1; // (Required) the limit. int32 limit = 2; } // Relation of type [[Aggregate]]. message Aggregate { // (Required) Input relation for a RelationalGroupedDataset. Relation input = 1; // (Required) How the RelationalGroupedDataset was built. GroupType group_type = 2; // (Required) Expressions for grouping keys repeated Expression grouping_expressions = 3; // (Required) List of values that will be translated to columns in the output DataFrame. repeated Expression aggregate_expressions = 4; // (Optional) Pivots a column of the current `DataFrame` and performs the specified aggregation. Pivot pivot = 5; enum GroupType { GROUP_TYPE_UNSPECIFIED = 0; GROUP_TYPE_GROUPBY = 1; GROUP_TYPE_ROLLUP = 2; GROUP_TYPE_CUBE = 3; GROUP_TYPE_PIVOT = 4; } message Pivot { // (Required) The column to pivot Expression col = 1; // (Optional) List of values that will be translated to columns in the output DataFrame. // // Note that if it is empty, the server side will immediately trigger a job to collect // the distinct values of the column. repeated Expression.Literal values = 2; } } // Relation of type [[Sort]]. message Sort { // (Required) Input relation for a Sort. Relation input = 1; // (Required) The ordering expressions repeated Expression.SortOrder order = 2; // (Optional) if this is a global sort. optional bool is_global = 3; } // Drop specified columns. message Drop { // (Required) The input relation. Relation input = 1; // (Optional) columns to drop. repeated Expression columns = 2; // (Optional) names of columns to drop. repeated string column_names = 3; } // Relation of type [[Deduplicate]] which have duplicate rows removed, could consider either only // the subset of columns or all the columns. message Deduplicate { // (Required) Input relation for a Deduplicate. Relation input = 1; // (Optional) Deduplicate based on a list of column names. // // This field does not co-use with `all_columns_as_keys`. repeated string column_names = 2; // (Optional) Deduplicate based on all the columns of the input relation. // // This field does not co-use with `column_names`. optional bool all_columns_as_keys = 3; // (Optional) Deduplicate within the time range of watermark. optional bool within_watermark = 4; } // A relation that does not need to be qualified by name. message LocalRelation { // (Optional) Local collection data serialized into Arrow IPC streaming format which contains // the schema of the data. optional bytes data = 1; // (Optional) The schema of local data. // It should be either a DDL-formatted type string or a JSON string. // // The server side will update the column names and data types according to this schema. // If the 'data' is not provided, then this schema will be required. optional string schema = 2; } // A local relation that has been cached already. message CachedLocalRelation { // `userId` and `sessionId` fields are deleted since the server must always use the active // session/user rather than arbitrary values provided by the client. It is never valid to access // a local relation from a different session/user. reserved 1, 2; reserved "userId", "sessionId"; // (Required) A sha-256 hash of the serialized local relation in proto, see LocalRelation. string hash = 3; } // Represents a remote relation that has been cached on server. message CachedRemoteRelation { // (Required) ID of the remote related (assigned by the service). string relation_id = 1; } // Relation of type [[Sample]] that samples a fraction of the dataset. message Sample { // (Required) Input relation for a Sample. Relation input = 1; // (Required) lower bound. double lower_bound = 2; // (Required) upper bound. double upper_bound = 3; // (Optional) Whether to sample with replacement. optional bool with_replacement = 4; // (Optional) The random seed. optional int64 seed = 5; // (Required) Explicitly sort the underlying plan to make the ordering deterministic or cache it. // This flag is true when invoking `dataframe.randomSplit` to randomly splits DataFrame with the // provided weights. Otherwise, it is false. bool deterministic_order = 6; } // Relation of type [[Range]] that generates a sequence of integers. message Range { // (Optional) Default value = 0 optional int64 start = 1; // (Required) int64 end = 2; // (Required) int64 step = 3; // Optional. Default value is assigned by 1) SQL conf "spark.sql.leafNodeDefaultParallelism" if // it is set, or 2) spark default parallelism. optional int32 num_partitions = 4; } // Relation alias. message SubqueryAlias { // (Required) The input relation of SubqueryAlias. Relation input = 1; // (Required) The alias. string alias = 2; // (Optional) Qualifier of the alias. repeated string qualifier = 3; } // Relation repartition. message Repartition { // (Required) The input relation of Repartition. Relation input = 1; // (Required) Must be positive. int32 num_partitions = 2; // (Optional) Default value is false. optional bool shuffle = 3; } // Compose the string representing rows for output. // It will invoke 'Dataset.showString' to compute the results. message ShowString { // (Required) The input relation. Relation input = 1; // (Required) Number of rows to show. int32 num_rows = 2; // (Required) If set to more than 0, truncates strings to // `truncate` characters and all cells will be aligned right. int32 truncate = 3; // (Required) If set to true, prints output rows vertically (one line per column value). bool vertical = 4; } // Compose the string representing rows for output. // It will invoke 'Dataset.htmlString' to compute the results. message HtmlString { // (Required) The input relation. Relation input = 1; // (Required) Number of rows to show. int32 num_rows = 2; // (Required) If set to more than 0, truncates strings to // `truncate` characters and all cells will be aligned right. int32 truncate = 3; } // Computes specified statistics for numeric and string columns. // It will invoke 'Dataset.summary' (same as 'StatFunctions.summary') // to compute the results. message StatSummary { // (Required) The input relation. Relation input = 1; // (Optional) Statistics from to be computed. // // Available statistics are: // count // mean // stddev // min // max // arbitrary approximate percentiles specified as a percentage (e.g. 75%) // count_distinct // approx_count_distinct // // If no statistics are given, this function computes 'count', 'mean', 'stddev', 'min', // 'approximate quartiles' (percentiles at 25%, 50%, and 75%), and 'max'. repeated string statistics = 2; } // Computes basic statistics for numeric and string columns, including count, mean, stddev, min, // and max. If no columns are given, this function computes statistics for all numerical or // string columns. message StatDescribe { // (Required) The input relation. Relation input = 1; // (Optional) Columns to compute statistics on. repeated string cols = 2; } // Computes a pair-wise frequency table of the given columns. Also known as a contingency table. // It will invoke 'Dataset.stat.crosstab' (same as 'StatFunctions.crossTabulate') // to compute the results. message StatCrosstab { // (Required) The input relation. Relation input = 1; // (Required) The name of the first column. // // Distinct items will make the first item of each row. string col1 = 2; // (Required) The name of the second column. // // Distinct items will make the column names of the DataFrame. string col2 = 3; } // Calculate the sample covariance of two numerical columns of a DataFrame. // It will invoke 'Dataset.stat.cov' (same as 'StatFunctions.calculateCov') to compute the results. message StatCov { // (Required) The input relation. Relation input = 1; // (Required) The name of the first column. string col1 = 2; // (Required) The name of the second column. string col2 = 3; } // Calculates the correlation of two columns of a DataFrame. Currently only supports the Pearson // Correlation Coefficient. It will invoke 'Dataset.stat.corr' (same as // 'StatFunctions.pearsonCorrelation') to compute the results. message StatCorr { // (Required) The input relation. Relation input = 1; // (Required) The name of the first column. string col1 = 2; // (Required) The name of the second column. string col2 = 3; // (Optional) Default value is 'pearson'. // // Currently only supports the Pearson Correlation Coefficient. optional string method = 4; } // Calculates the approximate quantiles of numerical columns of a DataFrame. // It will invoke 'Dataset.stat.approxQuantile' (same as 'StatFunctions.approxQuantile') // to compute the results. message StatApproxQuantile { // (Required) The input relation. Relation input = 1; // (Required) The names of the numerical columns. repeated string cols = 2; // (Required) A list of quantile probabilities. // // Each number must belong to [0, 1]. // For example 0 is the minimum, 0.5 is the median, 1 is the maximum. repeated double probabilities = 3; // (Required) The relative target precision to achieve (greater than or equal to 0). // // If set to zero, the exact quantiles are computed, which could be very expensive. // Note that values greater than 1 are accepted but give the same result as 1. double relative_error = 4; } // Finding frequent items for columns, possibly with false positives. // It will invoke 'Dataset.stat.freqItems' (same as 'StatFunctions.freqItems') // to compute the results. message StatFreqItems { // (Required) The input relation. Relation input = 1; // (Required) The names of the columns to search frequent items in. repeated string cols = 2; // (Optional) The minimum frequency for an item to be considered `frequent`. // Should be greater than 1e-4. optional double support = 3; } // Returns a stratified sample without replacement based on the fraction // given on each stratum. // It will invoke 'Dataset.stat.freqItems' (same as 'StatFunctions.freqItems') // to compute the results. message StatSampleBy { // (Required) The input relation. Relation input = 1; // (Required) The column that defines strata. Expression col = 2; // (Required) Sampling fraction for each stratum. // // If a stratum is not specified, we treat its fraction as zero. repeated Fraction fractions = 3; // (Optional) The random seed. optional int64 seed = 5; message Fraction { // (Required) The stratum. Expression.Literal stratum = 1; // (Required) The fraction value. Must be in [0, 1]. double fraction = 2; } } // Replaces null values. // It will invoke 'Dataset.na.fill' (same as 'DataFrameNaFunctions.fill') to compute the results. // Following 3 parameter combinations are supported: // 1, 'values' only contains 1 item, 'cols' is empty: // replaces null values in all type-compatible columns. // 2, 'values' only contains 1 item, 'cols' is not empty: // replaces null values in specified columns. // 3, 'values' contains more than 1 items, then 'cols' is required to have the same length: // replaces each specified column with corresponding value. message NAFill { // (Required) The input relation. Relation input = 1; // (Optional) Optional list of column names to consider. repeated string cols = 2; // (Required) Values to replace null values with. // // Should contain at least 1 item. // Only 4 data types are supported now: bool, long, double, string repeated Expression.Literal values = 3; } // Drop rows containing null values. // It will invoke 'Dataset.na.drop' (same as 'DataFrameNaFunctions.drop') to compute the results. message NADrop { // (Required) The input relation. Relation input = 1; // (Optional) Optional list of column names to consider. // // When it is empty, all the columns in the input relation will be considered. repeated string cols = 2; // (Optional) The minimum number of non-null and non-NaN values required to keep. // // When not set, it is equivalent to the number of considered columns, which means // a row will be kept only if all columns are non-null. // // 'how' options ('all', 'any') can be easily converted to this field: // - 'all' -> set 'min_non_nulls' 1; // - 'any' -> keep 'min_non_nulls' unset; optional int32 min_non_nulls = 3; } // Replaces old values with the corresponding values. // It will invoke 'Dataset.na.replace' (same as 'DataFrameNaFunctions.replace') // to compute the results. message NAReplace { // (Required) The input relation. Relation input = 1; // (Optional) List of column names to consider. // // When it is empty, all the type-compatible columns in the input relation will be considered. repeated string cols = 2; // (Optional) The value replacement mapping. repeated Replacement replacements = 3; message Replacement { // (Required) The old value. // // Only 4 data types are supported now: null, bool, double, string. Expression.Literal old_value = 1; // (Required) The new value. // // Should be of the same data type with the old value. Expression.Literal new_value = 2; } } // Rename columns on the input relation by the same length of names. message ToDF { // (Required) The input relation of RenameColumnsBySameLengthNames. Relation input = 1; // (Required) // // The number of columns of the input relation must be equal to the length // of this field. If this is not true, an exception will be returned. repeated string column_names = 2; } // Rename columns on the input relation by a map with name to name mapping. message WithColumnsRenamed { // (Required) The input relation. Relation input = 1; // (Required) // // Renaming column names of input relation from A to B where A is the map key // and B is the map value. This is a no-op if schema doesn't contain any A. It // does not require that all input relation column names to present as keys. // duplicated B are not allowed. map rename_columns_map = 2; } // Adding columns or replacing the existing columns that have the same names. message WithColumns { // (Required) The input relation. Relation input = 1; // (Required) // // Given a column name, apply the corresponding expression on the column. If column // name exists in the input relation, then replace the column. If the column name // does not exist in the input relation, then adds it as a new column. // // Only one name part is expected from each Expression.Alias. // // An exception is thrown when duplicated names are present in the mapping. repeated Expression.Alias aliases = 2; } message WithWatermark { // (Required) The input relation Relation input = 1; // (Required) Name of the column containing event time. string event_time = 2; // (Required) string delay_threshold = 3; } // Specify a hint over a relation. Hint should have a name and optional parameters. message Hint { // (Required) The input relation. Relation input = 1; // (Required) Hint name. // // Supported Join hints include BROADCAST, MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL. // // Supported partitioning hints include COALESCE, REPARTITION, REPARTITION_BY_RANGE. string name = 2; // (Optional) Hint parameters. repeated Expression parameters = 3; } // Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns set. message Unpivot { // (Required) The input relation. Relation input = 1; // (Required) Id columns. repeated Expression ids = 2; // (Optional) Value columns to unpivot. optional Values values = 3; // (Required) Name of the variable column. string variable_column_name = 4; // (Required) Name of the value column. string value_column_name = 5; message Values { repeated Expression values = 1; } } message ToSchema { // (Required) The input relation. Relation input = 1; // (Required) The user provided schema. // // The Sever side will update the dataframe with this schema. DataType schema = 2; } message RepartitionByExpression { // (Required) The input relation. Relation input = 1; // (Required) The partitioning expressions. repeated Expression partition_exprs = 2; // (Optional) number of partitions, must be positive. optional int32 num_partitions = 3; } message MapPartitions { // (Required) Input relation for a mapPartitions-equivalent API: mapInPandas, mapInArrow. Relation input = 1; // (Required) Input user-defined function. CommonInlineUserDefinedFunction func = 2; // (Optional) Whether to use barrier mode execution or not. optional bool is_barrier = 3; } message GroupMap { // (Required) Input relation for Group Map API: apply, applyInPandas. Relation input = 1; // (Required) Expressions for grouping keys. repeated Expression grouping_expressions = 2; // (Required) Input user-defined function. CommonInlineUserDefinedFunction func = 3; // (Optional) Expressions for sorting. Only used by Scala Sorted Group Map API. repeated Expression sorting_expressions = 4; // Below fields are only used by (Flat)MapGroupsWithState // (Optional) Input relation for initial State. Relation initial_input = 5; // (Optional) Expressions for grouping keys of the initial state input relation. repeated Expression initial_grouping_expressions = 6; // (Optional) True if MapGroupsWithState, false if FlatMapGroupsWithState. optional bool is_map_groups_with_state = 7; // (Optional) The output mode of the function. optional string output_mode = 8; // (Optional) Timeout configuration for groups that do not receive data for a while. optional string timeout_conf = 9; } message CoGroupMap { // (Required) One input relation for CoGroup Map API - applyInPandas. Relation input = 1; // Expressions for grouping keys of the first input relation. repeated Expression input_grouping_expressions = 2; // (Required) The other input relation. Relation other = 3; // Expressions for grouping keys of the other input relation. repeated Expression other_grouping_expressions = 4; // (Required) Input user-defined function. CommonInlineUserDefinedFunction func = 5; // (Optional) Expressions for sorting. Only used by Scala Sorted CoGroup Map API. repeated Expression input_sorting_expressions = 6; // (Optional) Expressions for sorting. Only used by Scala Sorted CoGroup Map API. repeated Expression other_sorting_expressions = 7; } message ApplyInPandasWithState { // (Required) Input relation for applyInPandasWithState. Relation input = 1; // (Required) Expressions for grouping keys. repeated Expression grouping_expressions = 2; // (Required) Input user-defined function. CommonInlineUserDefinedFunction func = 3; // (Required) Schema for the output DataFrame. string output_schema = 4; // (Required) Schema for the state. string state_schema = 5; // (Required) The output mode of the function. string output_mode = 6; // (Required) Timeout configuration for groups that do not receive data for a while. string timeout_conf = 7; } message CommonInlineUserDefinedTableFunction { // (Required) Name of the user-defined table function. string function_name = 1; // (Optional) Whether the user-defined table function is deterministic. bool deterministic = 2; // (Optional) Function input arguments. Empty arguments are allowed. repeated Expression arguments = 3; // (Required) Type of the user-defined table function. oneof function { PythonUDTF python_udtf = 4; } } message PythonUDTF { // (Optional) Return type of the Python UDTF. optional DataType return_type = 1; // (Required) EvalType of the Python UDTF. int32 eval_type = 2; // (Required) The encoded commands of the Python UDTF. bytes command = 3; // (Required) Python version being used in the client. string python_ver = 4; } // Collect arbitrary (named) metrics from a dataset. message CollectMetrics { // (Required) The input relation. Relation input = 1; // (Required) Name of the metrics. string name = 2; // (Required) The metric sequence. repeated Expression metrics = 3; } message Parse { // (Required) Input relation to Parse. The input is expected to have single text column. Relation input = 1; // (Required) The expected format of the text. ParseFormat format = 2; // (Optional) DataType representing the schema. If not set, Spark will infer the schema. optional DataType schema = 3; // Options for the csv/json parser. The map key is case insensitive. map options = 4; enum ParseFormat { PARSE_FORMAT_UNSPECIFIED = 0; PARSE_FORMAT_CSV = 1; PARSE_FORMAT_JSON = 2; } }