/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ syntax = 'proto3'; package spark.connect; import "google/protobuf/any.proto"; import "spark/connect/commands.proto"; import "spark/connect/common.proto"; import "spark/connect/expressions.proto"; import "spark/connect/relations.proto"; import "spark/connect/types.proto"; option java_multiple_files = true; option java_package = "org.apache.spark.connect.proto"; option go_package = "internal/generated"; // A [[Plan]] is the structure that carries the runtime information for the execution from the // client to the server. A [[Plan]] can either be of the type [[Relation]] which is a reference // to the underlying logical plan or it can be of the [[Command]] type that is used to execute // commands on the server. message Plan { oneof op_type { Relation root = 1; Command command = 2; } } // User Context is used to refer to one particular user session that is executing // queries in the backend. message UserContext { string user_id = 1; string user_name = 2; // To extend the existing user context message that is used to identify incoming requests, // Spark Connect leverages the Any protobuf type that can be used to inject arbitrary other // messages into this message. Extensions are stored as a `repeated` type to be able to // handle multiple active extensions. repeated google.protobuf.Any extensions = 999; } // Request to perform plan analyze, optionally to explain the plan. message AnalyzePlanRequest { // (Required) // // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context UserContext user_context = 2; // Provides optional information about the client sending the request. This field // can be used for language or version specific information and is only intended for // logging purposes and will not be interpreted by the server. optional string client_type = 3; oneof analyze { Schema schema = 4; Explain explain = 5; TreeString tree_string = 6; IsLocal is_local = 7; IsStreaming is_streaming = 8; InputFiles input_files = 9; SparkVersion spark_version = 10; DDLParse ddl_parse = 11; SameSemantics same_semantics = 12; SemanticHash semantic_hash = 13; Persist persist = 14; Unpersist unpersist = 15; GetStorageLevel get_storage_level = 16; } message Schema { // (Required) The logical plan to be analyzed. Plan plan = 1; } // Explains the input plan based on a configurable mode. message Explain { // (Required) The logical plan to be analyzed. Plan plan = 1; // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings. ExplainMode explain_mode = 2; // Plan explanation mode. enum ExplainMode { EXPLAIN_MODE_UNSPECIFIED = 0; // Generates only physical plan. EXPLAIN_MODE_SIMPLE = 1; // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan. // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects. // The optimized logical plan transforms through a set of optimization rules, resulting in the // physical plan. EXPLAIN_MODE_EXTENDED = 2; // Generates code for the statement, if any and a physical plan. EXPLAIN_MODE_CODEGEN = 3; // If plan node statistics are available, generates a logical plan and also the statistics. EXPLAIN_MODE_COST = 4; // Generates a physical plan outline and also node details. EXPLAIN_MODE_FORMATTED = 5; } } message TreeString { // (Required) The logical plan to be analyzed. Plan plan = 1; // (Optional) Max level of the schema. optional int32 level = 2; } message IsLocal { // (Required) The logical plan to be analyzed. Plan plan = 1; } message IsStreaming { // (Required) The logical plan to be analyzed. Plan plan = 1; } message InputFiles { // (Required) The logical plan to be analyzed. Plan plan = 1; } message SparkVersion { } message DDLParse { // (Required) The DDL formatted string to be parsed. string ddl_string = 1; } // Returns `true` when the logical query plans are equal and therefore return same results. message SameSemantics { // (Required) The plan to be compared. Plan target_plan = 1; // (Required) The other plan to be compared. Plan other_plan = 2; } message SemanticHash { // (Required) The logical plan to get a hashCode. Plan plan = 1; } message Persist { // (Required) The logical plan to persist. Relation relation = 1; // (Optional) The storage level. optional StorageLevel storage_level = 2; } message Unpersist { // (Required) The logical plan to unpersist. Relation relation = 1; // (Optional) Whether to block until all blocks are deleted. optional bool blocking = 2; } message GetStorageLevel { // (Required) The logical plan to get the storage level. Relation relation = 1; } } // Response to performing analysis of the query. Contains relevant metadata to be able to // reason about the performance. message AnalyzePlanResponse { string session_id = 1; oneof result { Schema schema = 2; Explain explain = 3; TreeString tree_string = 4; IsLocal is_local = 5; IsStreaming is_streaming = 6; InputFiles input_files = 7; SparkVersion spark_version = 8; DDLParse ddl_parse = 9; SameSemantics same_semantics = 10; SemanticHash semantic_hash = 11; Persist persist = 12; Unpersist unpersist = 13; GetStorageLevel get_storage_level = 14; } message Schema { DataType schema = 1; } message Explain { string explain_string = 1; } message TreeString { string tree_string = 1; } message IsLocal { bool is_local = 1; } message IsStreaming { bool is_streaming = 1; } message InputFiles { // A best-effort snapshot of the files that compose this Dataset repeated string files = 1; } message SparkVersion { string version = 1; } message DDLParse { DataType parsed = 1; } message SameSemantics { bool result = 1; } message SemanticHash { int32 result = 1; } message Persist { } message Unpersist { } message GetStorageLevel { // (Required) The StorageLevel as a result of get_storage_level request. StorageLevel storage_level = 1; } } // A request to be executed by the service. message ExecutePlanRequest { // (Required) // // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context // // user_context.user_id and session+id both identify a unique remote spark session on the // server side. UserContext user_context = 2; // (Optional) // Provide an id for this request. If not provided, it will be generated by the server. // It is returned in every ExecutePlanResponse.operation_id of the ExecutePlan response stream. // The id must be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` optional string operation_id = 6; // (Required) The logical plan to be executed / analyzed. Plan plan = 3; // Provides optional information about the client sending the request. This field // can be used for language or version specific information and is only intended for // logging purposes and will not be interpreted by the server. optional string client_type = 4; // Repeated element for options that can be passed to the request. This element is currently // unused but allows to pass in an extension value used for arbitrary options. repeated RequestOption request_options = 5; message RequestOption { oneof request_option { ReattachOptions reattach_options = 1; // Extension type for request options google.protobuf.Any extension = 999; } } // Tags to tag the given execution with. // Tags cannot contain ',' character and cannot be empty strings. // Used by Interrupt with interrupt.tag. repeated string tags = 7; } // The response of a query, can be one or more for each request. Responses belonging to the // same input query, carry the same `session_id`. message ExecutePlanResponse { string session_id = 1; // Identifies the ExecutePlan execution. // If set by the client in ExecutePlanRequest.operationId, that value is returned. // Otherwise generated by the server. // It is an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string operation_id = 12; // Identified the response in the stream. // The id is an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string response_id = 13; // Union type for the different response messages. oneof response_type { ArrowBatch arrow_batch = 2; // Special case for executing SQL commands. SqlCommandResult sql_command_result = 5; // Response for a streaming query. WriteStreamOperationStartResult write_stream_operation_start_result = 8; // Response for commands on a streaming query. StreamingQueryCommandResult streaming_query_command_result = 9; // Response for 'SparkContext.resources'. GetResourcesCommandResult get_resources_command_result = 10; // Response for commands on the streaming query manager. StreamingQueryManagerCommandResult streaming_query_manager_command_result = 11; // Response type informing if the stream is complete in reattachable execution. ResultComplete result_complete = 14; // Support arbitrary result objects. google.protobuf.Any extension = 999; } // Metrics for the query execution. Typically, this field is only present in the last // batch of results and then represent the overall state of the query execution. Metrics metrics = 4; // The metrics observed during the execution of the query plan. repeated ObservedMetrics observed_metrics = 6; // (Optional) The Spark schema. This field is available when `collect` is called. DataType schema = 7; // A SQL command returns an opaque Relation that can be directly used as input for the next // call. message SqlCommandResult { Relation relation = 1; } // Batch results of metrics. message ArrowBatch { int64 row_count = 1; bytes data = 2; } message Metrics { repeated MetricObject metrics = 1; message MetricObject { string name = 1; int64 plan_id = 2; int64 parent = 3; map execution_metrics = 4; } message MetricValue { string name = 1; int64 value = 2; string metric_type = 3; } } message ObservedMetrics { string name = 1; repeated Expression.Literal values = 2; } message ResultComplete { // If present, in a reattachable execution this means that after server sends onComplete, // the execution is complete. If the server sends onComplete without sending a ResultComplete, // it means that there is more, and the client should use ReattachExecute RPC to continue. } } // The key-value pair for the config request and response. message KeyValue { // (Required) The key. string key = 1; // (Optional) The value. optional string value = 2; } // Request to update or fetch the configurations. message ConfigRequest { // (Required) // // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context UserContext user_context = 2; // (Required) The operation for the config. Operation operation = 3; // Provides optional information about the client sending the request. This field // can be used for language or version specific information and is only intended for // logging purposes and will not be interpreted by the server. optional string client_type = 4; message Operation { oneof op_type { Set set = 1; Get get = 2; GetWithDefault get_with_default = 3; GetOption get_option = 4; GetAll get_all = 5; Unset unset = 6; IsModifiable is_modifiable = 7; } } message Set { // (Required) The config key-value pairs to set. repeated KeyValue pairs = 1; } message Get { // (Required) The config keys to get. repeated string keys = 1; } message GetWithDefault { // (Required) The config key-value paris to get. The value will be used as the default value. repeated KeyValue pairs = 1; } message GetOption { // (Required) The config keys to get optionally. repeated string keys = 1; } message GetAll { // (Optional) The prefix of the config key to get. optional string prefix = 1; } message Unset { // (Required) The config keys to unset. repeated string keys = 1; } message IsModifiable { // (Required) The config keys to check the config is modifiable. repeated string keys = 1; } } // Response to the config request. message ConfigResponse { string session_id = 1; // (Optional) The result key-value pairs. // // Available when the operation is 'Get', 'GetWithDefault', 'GetOption', 'GetAll'. // Also available for the operation 'IsModifiable' with boolean string "true" and "false". repeated KeyValue pairs = 2; // (Optional) // // Warning messages for deprecated or unsupported configurations. repeated string warnings = 3; } // Request to transfer client-local artifacts. message AddArtifactsRequest { // (Required) // // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // User context UserContext user_context = 2; // Provides optional information about the client sending the request. This field // can be used for language or version specific information and is only intended for // logging purposes and will not be interpreted by the server. optional string client_type = 6; // A chunk of an Artifact. message ArtifactChunk { // Data chunk. bytes data = 1; // CRC to allow server to verify integrity of the chunk. int64 crc = 2; } // An artifact that is contained in a single `ArtifactChunk`. // Generally, this message represents tiny artifacts such as REPL-generated class files. message SingleChunkArtifact { // The name of the artifact is expected in the form of a "Relative Path" that is made up of a // sequence of directories and the final file element. // Examples of "Relative Path"s: "jars/test.jar", "classes/xyz.class", "abc.xyz", "a/b/X.jar". // The server is expected to maintain the hierarchy of files as defined by their name. (i.e // The relative path of the file on the server's filesystem will be the same as the name of // the provided artifact) string name = 1; // A single data chunk. ArtifactChunk data = 2; } // A number of `SingleChunkArtifact` batched into a single RPC. message Batch { repeated SingleChunkArtifact artifacts = 1; } // Signals the beginning/start of a chunked artifact. // A large artifact is transferred through a payload of `BeginChunkedArtifact` followed by a // sequence of `ArtifactChunk`s. message BeginChunkedArtifact { // Name of the artifact undergoing chunking. Follows the same conventions as the `name` in // the `Artifact` message. string name = 1; // Total size of the artifact in bytes. int64 total_bytes = 2; // Number of chunks the artifact is split into. // This includes the `initial_chunk`. int64 num_chunks = 3; // The first/initial chunk. ArtifactChunk initial_chunk = 4; } // The payload is either a batch of artifacts or a partial chunk of a large artifact. oneof payload { Batch batch = 3; // The metadata and the initial chunk of a large artifact chunked into multiple requests. // The server side is notified about the total size of the large artifact as well as the // number of chunks to expect. BeginChunkedArtifact begin_chunk = 4; // A chunk of an artifact excluding metadata. This can be any chunk of a large artifact // excluding the first chunk (which is included in `BeginChunkedArtifact`). ArtifactChunk chunk = 5; } } // Response to adding an artifact. Contains relevant metadata to verify successful transfer of // artifact(s). message AddArtifactsResponse { // Metadata of an artifact. message ArtifactSummary { string name = 1; // Whether the CRC (Cyclic Redundancy Check) is successful on server verification. // The server discards any artifact that fails the CRC. // If false, the client may choose to resend the artifact specified by `name`. bool is_crc_successful = 2; } // The list of artifact(s) seen by the server. repeated ArtifactSummary artifacts = 1; } // Request to get current statuses of artifacts at the server side. message ArtifactStatusesRequest { // (Required) // // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // User context UserContext user_context = 2; // Provides optional information about the client sending the request. This field // can be used for language or version specific information and is only intended for // logging purposes and will not be interpreted by the server. optional string client_type = 3; // The name of the artifact is expected in the form of a "Relative Path" that is made up of a // sequence of directories and the final file element. // Examples of "Relative Path"s: "jars/test.jar", "classes/xyz.class", "abc.xyz", "a/b/X.jar". // The server is expected to maintain the hierarchy of files as defined by their name. (i.e // The relative path of the file on the server's filesystem will be the same as the name of // the provided artifact) repeated string names = 4; } // Response to checking artifact statuses. message ArtifactStatusesResponse { message ArtifactStatus { // Exists or not particular artifact at the server. bool exists = 1; } // A map of artifact names to their statuses. map statuses = 1; } message InterruptRequest { // (Required) // // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context UserContext user_context = 2; // Provides optional information about the client sending the request. This field // can be used for language or version specific information and is only intended for // logging purposes and will not be interpreted by the server. optional string client_type = 3; // (Required) The type of interrupt to execute. InterruptType interrupt_type = 4; enum InterruptType { INTERRUPT_TYPE_UNSPECIFIED = 0; // Interrupt all running executions within the session with the provided session_id. INTERRUPT_TYPE_ALL = 1; // Interrupt all running executions within the session with the provided operation_tag. INTERRUPT_TYPE_TAG = 2; // Interrupt the running execution within the session with the provided operation_id. INTERRUPT_TYPE_OPERATION_ID = 3; } oneof interrupt { // if interrupt_tag == INTERRUPT_TYPE_TAG, interrupt operation with this tag. string operation_tag = 5; // if interrupt_tag == INTERRUPT_TYPE_OPERATION_ID, interrupt operation with this operation_id. string operation_id = 6; } } message InterruptResponse { // Session id in which the interrupt was running. string session_id = 1; // Operation ids of the executions which were interrupted. repeated string interrupted_ids = 2; } message ReattachOptions { // If true, the request can be reattached to using ReattachExecute. // ReattachExecute can be used either if the stream broke with a GRPC network error, // or if the server closed the stream without sending a response with StreamStatus.complete=true. // The server will keep a buffer of responses in case a response is lost, and // ReattachExecute needs to back-track. // // If false, the execution response stream will will not be reattachable, and all responses are // immediately released by the server after being sent. bool reattachable = 1; } message ReattachExecuteRequest { // (Required) // // The session_id of the request to reattach to. // This must be an id of existing session. string session_id = 1; // (Required) User context // // user_context.user_id and session+id both identify a unique remote spark session on the // server side. UserContext user_context = 2; // (Required) // Provide an id of the request to reattach to. // This must be an id of existing operation. string operation_id = 3; // Provides optional information about the client sending the request. This field // can be used for language or version specific information and is only intended for // logging purposes and will not be interpreted by the server. optional string client_type = 4; // (Optional) // Last already processed response id from the response stream. // After reattach, server will resume the response stream after that response. // If not specified, server will restart the stream from the start. // // Note: server controls the amount of responses that it buffers and it may drop responses, // that are far behind the latest returned response, so this can't be used to arbitrarily // scroll back the cursor. If the response is no longer available, this will result in an error. optional string last_response_id = 5; } message ReleaseExecuteRequest { // (Required) // // The session_id of the request to reattach to. // This must be an id of existing session. string session_id = 1; // (Required) User context // // user_context.user_id and session+id both identify a unique remote spark session on the // server side. UserContext user_context = 2; // (Required) // Provide an id of the request to reattach to. // This must be an id of existing operation. string operation_id = 3; // Provides optional information about the client sending the request. This field // can be used for language or version specific information and is only intended for // logging purposes and will not be interpreted by the server. optional string client_type = 4; // Release and close operation completely. // This will also interrupt the query if it is running execution, and wait for it to be torn down. message ReleaseAll {} // Release all responses from the operation response stream up to and including // the response with the given by response_id. // While server determines by itself how much of a buffer of responses to keep, client providing // explicit release calls will help reduce resource consumption. // Noop if response_id not found in cached responses. message ReleaseUntil { string response_id = 1; } oneof release { ReleaseAll release_all = 5; ReleaseUntil release_until = 6; } } message ReleaseExecuteResponse { // Session id in which the release was running. string session_id = 1; // Operation id of the operation on which the release executed. // If the operation couldn't be found (because e.g. it was concurrently released), will be unset. // Otherwise, it will be equal to the operation_id from request. optional string operation_id = 2; } // Main interface for the SparkConnect service. service SparkConnectService { // Executes a request that contains the query and returns a stream of [[Response]]. // // It is guaranteed that there is at least one ARROW batch returned even if the result set is empty. rpc ExecutePlan(ExecutePlanRequest) returns (stream ExecutePlanResponse) {} // Analyzes a query and returns a [[AnalyzeResponse]] containing metadata about the query. rpc AnalyzePlan(AnalyzePlanRequest) returns (AnalyzePlanResponse) {} // Update or fetch the configurations and returns a [[ConfigResponse]] containing the result. rpc Config(ConfigRequest) returns (ConfigResponse) {} // Add artifacts to the session and returns a [[AddArtifactsResponse]] containing metadata about // the added artifacts. rpc AddArtifacts(stream AddArtifactsRequest) returns (AddArtifactsResponse) {} // Check statuses of artifacts in the session and returns them in a [[ArtifactStatusesResponse]] rpc ArtifactStatus(ArtifactStatusesRequest) returns (ArtifactStatusesResponse) {} // Interrupts running executions rpc Interrupt(InterruptRequest) returns (InterruptResponse) {} // Reattach to an existing reattachable execution. // The ExecutePlan must have been started with ReattachOptions.reattachable=true. // If the ExecutePlanResponse stream ends without a ResultComplete message, there is more to // continue. If there is a ResultComplete, the client should use ReleaseExecute with rpc ReattachExecute(ReattachExecuteRequest) returns (stream ExecutePlanResponse) {} // Release an reattachable execution, or parts thereof. // The ExecutePlan must have been started with ReattachOptions.reattachable=true. // Non reattachable executions are released automatically and immediately after the ExecutePlan // RPC and ReleaseExecute may not be used. rpc ReleaseExecute(ReleaseExecuteRequest) returns (ReleaseExecuteResponse) {} }