syntax = "proto3"; package flyteidl.event; option go_package = "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"; import "flyteidl/core/literals.proto"; import "flyteidl/core/compiler.proto"; import "flyteidl/core/execution.proto"; import "flyteidl/core/identifier.proto"; import "flyteidl/core/catalog.proto"; import "google/protobuf/timestamp.proto"; import "google/protobuf/struct.proto"; message WorkflowExecutionEvent { // Workflow execution id core.WorkflowExecutionIdentifier execution_id = 1; // the id of the originator (Propeller) of the event string producer_id = 2; core.WorkflowExecution.Phase phase = 3; // This timestamp represents when the original event occurred, it is generated // by the executor of the workflow. google.protobuf.Timestamp occurred_at = 4; oneof output_result { // URL to the output of the execution, it encodes all the information // including Cloud source provider. ie., s3://... string output_uri = 5; // Error information for the execution core.ExecutionError error = 6; // Raw output data produced by this workflow execution. core.LiteralMap output_data = 7; } } message NodeExecutionEvent { // Unique identifier for this node execution core.NodeExecutionIdentifier id = 1; // the id of the originator (Propeller) of the event string producer_id = 2; core.NodeExecution.Phase phase = 3; // This timestamp represents when the original event occurred, it is generated // by the executor of the node. google.protobuf.Timestamp occurred_at = 4; oneof input_value { string input_uri = 5; // Raw input data consumed by this node execution. core.LiteralMap input_data = 20; } oneof output_result { // URL to the output of the execution, it encodes all the information // including Cloud source provider. ie., s3://... string output_uri = 6; // Error information for the execution core.ExecutionError error = 7; // Raw output data produced by this node execution. core.LiteralMap output_data = 15; } // Additional metadata to do with this event's node target based // on the node type oneof target_metadata { WorkflowNodeMetadata workflow_node_metadata = 8; TaskNodeMetadata task_node_metadata = 14; } // [To be deprecated] Specifies which task (if any) launched this node. ParentTaskExecutionMetadata parent_task_metadata = 9; // Specifies the parent node of the current node execution. Node executions at level zero will not have a parent node. ParentNodeExecutionMetadata parent_node_metadata = 10; // Retry group to indicate grouping of nodes by retries string retry_group = 11; // Identifier of the node in the original workflow/graph // This maps to value of WorkflowTemplate.nodes[X].id string spec_node_id = 12; // Friendly readable name for the node string node_name = 13; int32 event_version = 16; // Whether this node launched a subworkflow. bool is_parent = 17; // Whether this node yielded a dynamic workflow. bool is_dynamic = 18; // String location uniquely identifying where the deck HTML file is // NativeUrl specifies the url in the format of the configured storage provider (e.g. s3://my-bucket/randomstring/suffix.tar) string deck_uri = 19; // This timestamp represents the instant when the event was reported by the executing framework. For example, // when first processing a node the `occurred_at` timestamp should be the instant propeller makes progress, so when // literal inputs are initially copied. The event however will not be sent until after the copy completes. // Extracting both of these timestamps facilitates a more accurate portrayal of the evaluation time-series. google.protobuf.Timestamp reported_at = 21; // Indicates if this node is an ArrayNode. bool is_array = 22; } // For Workflow Nodes we need to send information about the workflow that's launched message WorkflowNodeMetadata { core.WorkflowExecutionIdentifier execution_id = 1; } message TaskNodeMetadata { // Captures the status of caching for this execution. core.CatalogCacheStatus cache_status = 1; // This structure carries the catalog artifact information core.CatalogMetadata catalog_key = 2; // Captures the status of cache reservations for this execution. core.CatalogReservation.Status reservation_status = 3; // The latest checkpoint location string checkpoint_uri = 4; // In the case this task launched a dynamic workflow we capture its structure here. DynamicWorkflowNodeMetadata dynamic_workflow = 16; } // For dynamic workflow nodes we send information about the dynamic workflow definition that gets generated. message DynamicWorkflowNodeMetadata { // id represents the unique identifier of the workflow. core.Identifier id = 1; // Represents the compiled representation of the embedded dynamic workflow. core.CompiledWorkflowClosure compiled_workflow = 2; // dynamic_job_spec_uri is the location of the DynamicJobSpec proto message for this DynamicWorkflow. This is // required to correctly recover partially completed executions where the workflow has already been compiled. string dynamic_job_spec_uri = 3; } message ParentTaskExecutionMetadata { core.TaskExecutionIdentifier id = 1; } message ParentNodeExecutionMetadata { // Unique identifier of the parent node id within the execution // This is value of core.NodeExecutionIdentifier.node_id of the parent node string node_id = 1; } message EventReason { // An explanation for this event string reason = 1; // The time this reason occurred google.protobuf.Timestamp occurred_at = 2; } // Plugin specific execution event information. For tasks like Python, Hive, Spark, DynamicJob. message TaskExecutionEvent { // ID of the task. In combination with the retryAttempt this will indicate // the task execution uniquely for a given parent node execution. core.Identifier task_id = 1; // A task execution is always kicked off by a node execution, the event consumer // will use the parent_id to relate the task to it's parent node execution core.NodeExecutionIdentifier parent_node_execution_id = 2; // retry attempt number for this task, ie., 2 for the second attempt uint32 retry_attempt = 3; // Phase associated with the event core.TaskExecution.Phase phase = 4; // id of the process that sent this event, mainly for trace debugging string producer_id = 5; // log information for the task execution repeated core.TaskLog logs = 6; // This timestamp represents when the original event occurred, it is generated // by the executor of the task. google.protobuf.Timestamp occurred_at = 7; oneof input_value { // URI of the input file, it encodes all the information // including Cloud source provider. ie., s3://... string input_uri = 8; // Raw input data consumed by this task execution. core.LiteralMap input_data = 19; } oneof output_result { // URI to the output of the execution, it will be in a format that encodes all the information // including Cloud source provider. ie., s3://... string output_uri = 9; // Error information for the execution core.ExecutionError error = 10; // Raw output data produced by this task execution. core.LiteralMap output_data = 17; } // Custom data that the task plugin sends back. This is extensible to allow various plugins in the system. google.protobuf.Struct custom_info = 11; // Some phases, like RUNNING, can send multiple events with changed metadata (new logs, additional custom_info, etc) // that should be recorded regardless of the lack of phase change. // The version field should be incremented when metadata changes across the duration of an individual phase. uint32 phase_version = 12; // An optional explanation for the phase transition. // Deprecated: Use reasons instead. string reason = 13 [deprecated = true]; // An optional list of explanations for the phase transition. repeated EventReason reasons = 21; // A predefined yet extensible Task type identifier. If the task definition is already registered in flyte admin // this type will be identical, but not all task executions necessarily use pre-registered definitions and this // type is useful to render the task in the UI, filter task executions, etc. string task_type = 14; // Metadata around how a task was executed. TaskExecutionMetadata metadata = 16; // The event version is used to indicate versioned changes in how data is reported using this // proto message. For example, event_verison > 0 means that maps tasks report logs using the // TaskExecutionMetadata ExternalResourceInfo fields for each subtask rather than the TaskLog // in this message. int32 event_version = 18; // This timestamp represents the instant when the event was reported by the executing framework. For example, a k8s // pod task may be marked completed at (ie. `occurred_at`) the instant the container running user code completes, // but this event will not be reported until the pod is marked as completed. Extracting both of these timestamps // facilitates a more accurate portrayal of the evaluation time-series. google.protobuf.Timestamp reported_at = 20; } // This message contains metadata about external resources produced or used by a specific task execution. message ExternalResourceInfo { // Identifier for an external resource created by this task execution, for example Qubole query ID or presto query ids. string external_id = 1; // A unique index for the external resource with respect to all external resources for this task. Although the // identifier may change between task reporting events or retries, this will remain the same to enable aggregating // information from multiple reports. uint32 index = 2; // Retry attempt number for this external resource, ie., 2 for the second attempt uint32 retry_attempt = 3; // Phase associated with the external resource core.TaskExecution.Phase phase = 4; // Captures the status of caching for this external resource execution. core.CatalogCacheStatus cache_status = 5; // log information for the external resource execution repeated core.TaskLog logs = 6; } // This message holds task execution metadata specific to resource allocation used to manage concurrent // executions for a project namespace. message ResourcePoolInfo { // Unique resource ID used to identify this execution when allocating a token. string allocation_token = 1; // Namespace under which this task execution requested an allocation token. string namespace = 2; } // Holds metadata around how a task was executed. // As a task transitions across event phases during execution some attributes, such its generated name, generated external resources, // and more may grow in size but not change necessarily based on the phase transition that sparked the event update. // Metadata is a container for these attributes across the task execution lifecycle. message TaskExecutionMetadata { // Unique, generated name for this task execution used by the backend. string generated_name = 1; // Additional data on external resources on other back-ends or platforms (e.g. Hive, Qubole, etc) launched by this task execution. repeated ExternalResourceInfo external_resources = 2; // Includes additional data on concurrent resource management used during execution.. // This is a repeated field because a plugin can request multiple resource allocations during execution. repeated ResourcePoolInfo resource_pool_info = 3; // The identifier of the plugin used to execute this task. string plugin_identifier = 4; // Includes the broad category of machine used for this specific task execution. enum InstanceClass { // The default instance class configured for the flyte application platform. DEFAULT = 0; // The instance class configured for interruptible tasks. INTERRUPTIBLE = 1; } InstanceClass instance_class = 16; }