/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ syntax = 'proto3'; import "google/protobuf/any.proto"; import "spark/connect/common.proto"; import "spark/connect/expressions.proto"; import "spark/connect/relations.proto"; package spark.connect; option java_multiple_files = true; option java_package = "org.apache.spark.connect.proto"; option go_package = "internal/generated"; // A [[Command]] is an operation that is executed by the server that does not directly consume or // produce a relational result. message Command { oneof command_type { CommonInlineUserDefinedFunction register_function = 1; WriteOperation write_operation = 2; CreateDataFrameViewCommand create_dataframe_view = 3; WriteOperationV2 write_operation_v2 = 4; SqlCommand sql_command = 5; WriteStreamOperationStart write_stream_operation_start = 6; StreamingQueryCommand streaming_query_command = 7; GetResourcesCommand get_resources_command = 8; StreamingQueryManagerCommand streaming_query_manager_command = 9; CommonInlineUserDefinedTableFunction register_table_function = 10; // This field is used to mark extensions to the protocol. When plugins generate arbitrary // Commands they can add them here. During the planning the correct resolution is done. google.protobuf.Any extension = 999; } } // A SQL Command is used to trigger the eager evaluation of SQL commands in Spark. // // When the SQL provide as part of the message is a command it will be immediately evaluated // and the result will be collected and returned as part of a LocalRelation. If the result is // not a command, the operation will simply return a SQL Relation. This allows the client to be // almost oblivious to the server-side behavior. message SqlCommand { // (Required) SQL Query. string sql = 1; // (Optional) A map of parameter names to literal expressions. map args = 2; // (Optional) A sequence of literal expressions for positional parameters in the SQL query text. repeated Expression.Literal pos_args = 3; } // A command that can create DataFrame global temp view or local temp view. message CreateDataFrameViewCommand { // (Required) The relation that this view will be built on. Relation input = 1; // (Required) View name. string name = 2; // (Required) Whether this is global temp view or local temp view. bool is_global = 3; // (Required) // // If true, and if the view already exists, updates it; if false, and if the view // already exists, throws exception. bool replace = 4; } // As writes are not directly handled during analysis and planning, they are modeled as commands. message WriteOperation { // (Required) The output of the `input` relation will be persisted according to the options. Relation input = 1; // (Optional) Format value according to the Spark documentation. Examples are: text, parquet, delta. optional string source = 2; // (Optional) // // The destination of the write operation can be either a path or a table. // If the destination is neither a path nor a table, such as jdbc and noop, // the `save_type` should not be set. oneof save_type { string path = 3; SaveTable table = 4; } // (Required) the save mode. SaveMode mode = 5; // (Optional) List of columns to sort the output by. repeated string sort_column_names = 6; // (Optional) List of columns for partitioning. repeated string partitioning_columns = 7; // (Optional) Bucketing specification. Bucketing must set the number of buckets and the columns // to bucket by. BucketBy bucket_by = 8; // (Optional) A list of configuration options. map options = 9; message SaveTable { // (Required) The table name. string table_name = 1; // (Required) The method to be called to write to the table. TableSaveMethod save_method = 2; enum TableSaveMethod { TABLE_SAVE_METHOD_UNSPECIFIED = 0; TABLE_SAVE_METHOD_SAVE_AS_TABLE = 1; TABLE_SAVE_METHOD_INSERT_INTO = 2; } } message BucketBy { repeated string bucket_column_names = 1; int32 num_buckets = 2; } enum SaveMode { SAVE_MODE_UNSPECIFIED = 0; SAVE_MODE_APPEND = 1; SAVE_MODE_OVERWRITE = 2; SAVE_MODE_ERROR_IF_EXISTS = 3; SAVE_MODE_IGNORE = 4; } } // As writes are not directly handled during analysis and planning, they are modeled as commands. message WriteOperationV2 { // (Required) The output of the `input` relation will be persisted according to the options. Relation input = 1; // (Required) The destination of the write operation must be either a path or a table. string table_name = 2; // (Optional) A provider for the underlying output data source. Spark's default catalog supports // "parquet", "json", etc. optional string provider = 3; // (Optional) List of columns for partitioning for output table created by `create`, // `createOrReplace`, or `replace` repeated Expression partitioning_columns = 4; // (Optional) A list of configuration options. map options = 5; // (Optional) A list of table properties. map table_properties = 6; // (Required) Write mode. Mode mode = 7; enum Mode { MODE_UNSPECIFIED = 0; MODE_CREATE = 1; MODE_OVERWRITE = 2; MODE_OVERWRITE_PARTITIONS = 3; MODE_APPEND = 4; MODE_REPLACE = 5; MODE_CREATE_OR_REPLACE = 6; } // (Optional) A condition for overwrite saving mode Expression overwrite_condition = 8; } // Starts write stream operation as streaming query. Query ID and Run ID of the streaming // query are returned. message WriteStreamOperationStart { // (Required) The output of the `input` streaming relation will be written. Relation input = 1; // The following fields directly map to API for DataStreamWriter(). // Consult API documentation unless explicitly documented here. string format = 2; map options = 3; repeated string partitioning_column_names = 4; oneof trigger { string processing_time_interval = 5; bool available_now = 6; bool once = 7; string continuous_checkpoint_interval = 8; } string output_mode = 9; string query_name = 10; // The destination is optional. When set, it can be a path or a table name. oneof sink_destination { string path = 11; string table_name = 12; } StreamingForeachFunction foreach_writer = 13; StreamingForeachFunction foreach_batch = 14; } message StreamingForeachFunction { oneof function { PythonUDF python_function = 1; ScalarScalaUDF scala_function = 2; } } message WriteStreamOperationStartResult { // (Required) Query instance. See `StreamingQueryInstanceId`. StreamingQueryInstanceId query_id = 1; // An optional query name. string name = 2; // TODO: How do we indicate errors? // TODO: Consider adding status, last progress etc here. } // A tuple that uniquely identifies an instance of streaming query run. It consists of `id` that // persists across the streaming runs and `run_id` that changes between each run of the // streaming query that resumes from the checkpoint. message StreamingQueryInstanceId { // (Required) The unique id of this query that persists across restarts from checkpoint data. // That is, this id is generated when a query is started for the first time, and // will be the same every time it is restarted from checkpoint data. string id = 1; // (Required) The unique id of this run of the query. That is, every start/restart of a query // will generate a unique run_id. Therefore, every time a query is restarted from // checkpoint, it will have the same `id` but different `run_id`s. string run_id = 2; } // Commands for a streaming query. message StreamingQueryCommand { // (Required) Query instance. See `StreamingQueryInstanceId`. StreamingQueryInstanceId query_id = 1; // See documentation for the corresponding API method in StreamingQuery. oneof command { // status() API. bool status = 2; // lastProgress() API. bool last_progress = 3; // recentProgress() API. bool recent_progress = 4; // stop() API. Stops the query. bool stop = 5; // processAllAvailable() API. Waits till all the available data is processed bool process_all_available = 6; // explain() API. Returns logical and physical plans. ExplainCommand explain = 7; // exception() API. Returns the exception in the query if any. bool exception = 8; // awaitTermination() API. Waits for the termination of the query. AwaitTerminationCommand await_termination = 9; } message ExplainCommand { // TODO: Consider reusing Explain from AnalyzePlanRequest message. // We can not do this right now since it base.proto imports this file. bool extended = 1; } message AwaitTerminationCommand { optional int64 timeout_ms = 2; } } // Response for commands on a streaming query. message StreamingQueryCommandResult { // (Required) Query instance id. See `StreamingQueryInstanceId`. StreamingQueryInstanceId query_id = 1; oneof result_type { StatusResult status = 2; RecentProgressResult recent_progress = 3; ExplainResult explain = 4; ExceptionResult exception = 5; AwaitTerminationResult await_termination = 6; } message StatusResult { // See documentation for these Scala 'StreamingQueryStatus' struct string status_message = 1; bool is_data_available = 2; bool is_trigger_active = 3; bool is_active = 4; } message RecentProgressResult { // Progress reports as an array of json strings. repeated string recent_progress_json = 5; } message ExplainResult { // Logical and physical plans as string string result = 1; } message ExceptionResult { // (Optional) Exception message as string, maps to the return value of original // StreamingQueryException's toString method optional string exception_message = 1; // (Optional) Exception error class as string optional string error_class = 2; // (Optional) Exception stack trace as string optional string stack_trace = 3; } message AwaitTerminationResult { bool terminated = 1; } } // Commands for the streaming query manager. message StreamingQueryManagerCommand { // See documentation for the corresponding API method in StreamingQueryManager. oneof command { // active() API, returns a list of active queries. bool active = 1; // get() API, returns the StreamingQuery identified by id. string get_query = 2; // awaitAnyTermination() API, wait until any query terminates or timeout. AwaitAnyTerminationCommand await_any_termination = 3; // resetTerminated() API. bool reset_terminated = 4; // addListener API. StreamingQueryListenerCommand add_listener = 5; // removeListener API. StreamingQueryListenerCommand remove_listener = 6; // listListeners() API, returns a list of streaming query listeners. bool list_listeners = 7; } message AwaitAnyTerminationCommand { // (Optional) The waiting time in milliseconds to wait for any query to terminate. optional int64 timeout_ms = 1; } message StreamingQueryListenerCommand { bytes listener_payload = 1; optional PythonUDF python_listener_payload = 2; string id = 3; } } // Response for commands on the streaming query manager. message StreamingQueryManagerCommandResult { oneof result_type { ActiveResult active = 1; StreamingQueryInstance query = 2; AwaitAnyTerminationResult await_any_termination = 3; bool reset_terminated = 4; bool add_listener = 5; bool remove_listener = 6; ListStreamingQueryListenerResult list_listeners = 7; } message ActiveResult { repeated StreamingQueryInstance active_queries = 1; } message StreamingQueryInstance { // (Required) The id and runId of this query. StreamingQueryInstanceId id = 1; // (Optional) The name of this query. optional string name = 2; } message AwaitAnyTerminationResult { bool terminated = 1; } message StreamingQueryListenerInstance { bytes listener_payload = 1; } message ListStreamingQueryListenerResult { // (Required) Reference IDs of listener instances. repeated string listener_ids = 1; } } // Command to get the output of 'SparkContext.resources' message GetResourcesCommand { } // Response for command 'GetResourcesCommand'. message GetResourcesCommandResult { map resources = 1; }