// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use std::borrow::Cow; use std::cmp::Ordering; use std::num::NonZeroU32; use std::pin::Pin; use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; use futures::Stream; use num_bigint::BigInt; use serde::Deserialize; use serde::Serialize; use uuid::Uuid; use crate::codec::canonicalize_f64; pub type WatchStream = Pin<Box<dyn Stream<Item = Result<Vec<WatchKeyOutput>, anyhow::Error>>>>; #[async_trait(?Send)] pub trait Database: Clone + Sized { type QMH: QueueMessageHandle + 'static; async fn snapshot_read( &self, requests: Vec<ReadRange>, options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, anyhow::Error>; async fn atomic_write( &self, write: AtomicWrite, ) -> Result<Option<CommitResult>, anyhow::Error>; async fn dequeue_next_message( &self, ) -> Result<Option<Self::QMH>, anyhow::Error>; fn watch(&self, keys: Vec<Vec<u8>>) -> WatchStream; fn close(&self); } #[async_trait(?Send)] pub trait QueueMessageHandle { async fn take_payload(&mut self) -> Result<Vec<u8>, anyhow::Error>; async fn finish(&self, success: bool) -> Result<(), anyhow::Error>; } #[async_trait(?Send)] impl QueueMessageHandle for Box<dyn QueueMessageHandle> { async fn take_payload(&mut self) -> Result<Vec<u8>, anyhow::Error> { (**self).take_payload().await } async fn finish(&self, success: bool) -> Result<(), anyhow::Error> { (**self).finish(success).await } } /// Options for a snapshot read. #[derive(Clone, Debug)] pub struct SnapshotReadOptions { pub consistency: Consistency, } /// The consistency of a read. #[derive(Eq, PartialEq, Copy, Clone, Debug)] pub enum Consistency { Strong, Eventual, } /// A key is for a KV pair. It is a vector of KeyParts. /// /// The ordering of the keys is defined by the ordering of the KeyParts. The /// first KeyPart is the most significant, and the last KeyPart is the least /// significant. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)] pub struct Key(pub Vec<KeyPart>); /// A key part is single item in a key. It can be a boolean, a double float, a /// variable precision signed integer, a UTF-8 string, or an arbitrary byte /// array. /// /// The ordering of a KeyPart is dependent on the type of the KeyPart. /// /// Between different types, the ordering is as follows: arbitrary byte array < /// UTF-8 string < variable precision signed integer < double float < false < true. /// /// Within a type, the ordering is as follows: /// - For a **boolean**, false is less than true. /// - For a **double float**, the ordering must follow -NaN < -Infinity < -100.0 < -1.0 < -0.5 < -0.0 < 0.0 < 0.5 < 1.0 < 100.0 < Infinity < NaN. /// - For a **variable precision signed integer**, the ordering must follow mathematical ordering. /// - For a **UTF-8 string**, the ordering must follow the UTF-8 byte ordering. /// - For an **arbitrary byte array**, the ordering must follow the byte ordering. /// /// This means that the key part `1.0` is less than the key part `2.0`, but is /// greater than the key part `0n`, because `1.0` is a double float and `0n` /// is a variable precision signed integer, and the ordering types obviously has /// precedence over the ordering within a type. #[derive(Clone, Debug)] pub enum KeyPart { Bytes(Vec<u8>), String(String), Int(BigInt), Float(f64), False, True, } impl KeyPart { fn tag_ordering(&self) -> u8 { match self { KeyPart::Bytes(_) => 0, KeyPart::String(_) => 1, KeyPart::Int(_) => 2, KeyPart::Float(_) => 3, KeyPart::False => 4, KeyPart::True => 5, } } } impl Eq for KeyPart {} impl PartialEq for KeyPart { fn eq(&self, other: &Self) -> bool { self.cmp(other) == Ordering::Equal } } impl Ord for KeyPart { fn cmp(&self, other: &Self) -> Ordering { match (self, other) { (KeyPart::Bytes(b1), KeyPart::Bytes(b2)) => b1.cmp(b2), (KeyPart::String(s1), KeyPart::String(s2)) => { s1.as_bytes().cmp(s2.as_bytes()) } (KeyPart::Int(i1), KeyPart::Int(i2)) => i1.cmp(i2), (KeyPart::Float(f1), KeyPart::Float(f2)) => { canonicalize_f64(*f1).total_cmp(&canonicalize_f64(*f2)) } _ => self.tag_ordering().cmp(&other.tag_ordering()), } } } impl PartialOrd for KeyPart { fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { Some(self.cmp(other)) } } /// A request to read a range of keys from the database. If `end` is `None`, /// then the range is from `start` shall also be used as the end of the range. /// /// The range is inclusive of the start and exclusive of the end. The start may /// not be greater than the end. /// /// The range is limited to `limit` number of entries. #[derive(Clone, Debug)] pub struct ReadRange { pub start: Vec<u8>, pub end: Vec<u8>, pub limit: NonZeroU32, pub reverse: bool, } /// A response to a `ReadRange` request. #[derive(Debug)] pub struct ReadRangeOutput { pub entries: Vec<KvEntry>, } /// A versionstamp is a 10 byte array that is used to represent the version of /// a key in the database. pub type Versionstamp = [u8; 10]; /// A key-value entry with a versionstamp. #[derive(Debug)] pub struct KvEntry { pub key: Vec<u8>, pub value: KvValue, pub versionstamp: Versionstamp, } /// A serialized value for a KV pair as stored in the database. All values /// **can** be serialized into the V8 representation, but not all values are. /// /// The V8 representation is an opaque byte array that is only meaningful to /// the V8 engine. It is guaranteed to be backwards compatible. Because this /// representation is opaque, it is not possible to inspect or modify the value /// without deserializing it. /// /// The inability to inspect or modify the value without deserializing it means /// that these values can not be quickly modified when performing atomic /// read-modify-write operations on the database (because the database may not /// have the ability to deserialize the V8 value into a modifiable value). /// /// Because of this constraint, there are more specialized representations for /// certain types of values that can be used in atomic read-modify-write /// operations. These specialized representations are: /// /// - **Bytes**: an arbitrary byte array. /// - **U64**: a 64-bit unsigned integer. #[derive(Debug)] pub enum KvValue { V8(Vec<u8>), Bytes(Vec<u8>), U64(u64), } /// A request to perform an atomic check-modify-write operation on the database. /// /// The operation is performed atomically, meaning that the operation will /// either succeed or fail. If the operation fails, then the database will be /// left in the same state as before the operation was attempted. If the /// operation succeeds, then the database will be left in a new state. /// /// The operation is performed by first checking the database for the current /// state of the keys, defined by the `checks` field. If the current state of /// the keys does not match the expected state, then the operation fails. If /// the current state of the keys matches the expected state, then the /// mutations are applied to the database. /// /// All checks and mutations are performed atomically. /// /// The mutations are performed in the order that they are specified in the /// `mutations` field. The order of checks is not specified, and is also not /// important because this ordering is un-observable. pub struct AtomicWrite { pub checks: Vec<Check>, pub mutations: Vec<Mutation>, pub enqueues: Vec<Enqueue>, } /// A request to perform a check on a key in the database. The check is not /// performed on the value of the key, but rather on the versionstamp of the /// key. pub struct Check { pub key: Vec<u8>, pub versionstamp: Option<Versionstamp>, } /// A request to perform a mutation on a key in the database. The mutation is /// performed on the value of the key. /// /// The type of mutation is specified by the `kind` field. The action performed /// by each mutation kind is specified in the docs for [MutationKind]. pub struct Mutation { pub key: Vec<u8>, pub kind: MutationKind, pub expire_at: Option<DateTime<Utc>>, } /// A request to enqueue a message to the database. This message is delivered /// to a listener of the queue at least once. /// /// ## Retry /// /// When the delivery of a message fails, it is retried for a finite number /// of times. Each retry happens after a backoff period. The backoff periods /// are specified by the `backoff_schedule` field in milliseconds. If /// unspecified, the default backoff schedule of the platform (CLI or Deploy) /// is used. /// /// If all retry attempts failed, the message is written to the KV under all /// keys specified in `keys_if_undelivered`. pub struct Enqueue { pub payload: Vec<u8>, pub deadline: DateTime<Utc>, pub keys_if_undelivered: Vec<Vec<u8>>, pub backoff_schedule: Option<Vec<u32>>, } /// The type of mutation to perform on a key in the database. /// /// ## Set /// /// The set mutation sets the value of the key to the specified value. It /// discards the previous value of the key, if any. /// /// This operand supports all [Value] types. /// /// ## Delete /// /// The delete mutation deletes the value of the key. /// /// ## Sum /// /// The sum mutation adds the specified value to the existing value of the key. /// /// This operand supports only value types [Value::U64]. The existing value in /// the database must match the type of the value specified in the mutation. If /// the key does not exist in the database, then the value specified in the /// mutation is used as the new value of the key. /// /// ## Min /// /// The min mutation sets the value of the key to the minimum of the existing /// value of the key and the specified value. /// /// This operand supports only value types [Value::U64]. The existing value in /// the database must match the type of the value specified in the mutation. If /// the key does not exist in the database, then the value specified in the /// mutation is used as the new value of the key. /// /// ## Max /// /// The max mutation sets the value of the key to the maximum of the existing /// value of the key and the specified value. /// /// This operand supports only value types [Value::U64]. The existing value in /// the database must match the type of the value specified in the mutation. If /// the key does not exist in the database, then the value specified in the /// mutation is used as the new value of the key. #[derive(Debug)] pub enum MutationKind { Set(KvValue), Delete, Sum { value: KvValue, min_v8: Vec<u8>, max_v8: Vec<u8>, clamp: bool, }, Min(KvValue), Max(KvValue), SetSuffixVersionstampedKey(KvValue), } impl MutationKind { pub fn value(&self) -> Option<&KvValue> { match self { MutationKind::Set(value) => Some(value), MutationKind::Sum { value, .. } => Some(value), MutationKind::Min(value) => Some(value), MutationKind::Max(value) => Some(value), MutationKind::SetSuffixVersionstampedKey(value) => Some(value), MutationKind::Delete => None, } } } /// The result of a successful commit of an atomic write operation. #[derive(Debug)] pub struct CommitResult { /// The new versionstamp of the data that was committed. pub versionstamp: Versionstamp, } #[derive(Debug)] /// The message notifying about the status of a single key in a watch request. pub enum WatchKeyOutput { /// The key has not changed since the last delivery. Deliver the entry. Unchanged, /// The key has changed since the last delivery. Deliver the new entry. Changed { entry: Option<KvEntry> }, } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct MetadataExchangeRequest { #[serde(default)] pub supported_versions: Vec<u64>, } /// The database metadata that is returned by the KV Connect metadata endpoint. #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct DatabaseMetadata { pub version: u64, pub database_id: Uuid, pub endpoints: Vec<EndpointInfo>, pub token: Cow<'static, str>, pub expires_at: DateTime<Utc>, } /// An endpoint that can be used to connect to the database. #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct EndpointInfo { pub url: Cow<'static, str>, // Using `String` instead of an enum, so that parsing doesn't // break if more consistency levels are added. pub consistency: Cow<'static, str>, } pub const VALUE_ENCODING_V8: i64 = 1; pub const VALUE_ENCODING_LE64: i64 = 2; pub const VALUE_ENCODING_BYTES: i64 = 3; /// Decode a value, returning None if the encoding is not understood. pub fn decode_value(value: Vec<u8>, encoding: i64) -> Option<KvValue> { let value = match encoding { VALUE_ENCODING_V8 => KvValue::V8(value), VALUE_ENCODING_BYTES => KvValue::Bytes(value), VALUE_ENCODING_LE64 => { let mut buf = [0; 8]; buf.copy_from_slice(&value); KvValue::U64(u64::from_le_bytes(buf)) } _ => return None, }; Some(value) } pub fn encode_value(value: &KvValue) -> (Cow<'_, [u8]>, i64) { match value { KvValue::V8(value) => (Cow::Borrowed(value), VALUE_ENCODING_V8), KvValue::Bytes(value) => (Cow::Borrowed(value), VALUE_ENCODING_BYTES), KvValue::U64(value) => { let mut buf = [0; 8]; buf.copy_from_slice(&value.to_le_bytes()); (Cow::Owned(buf.to_vec()), VALUE_ENCODING_LE64) } } } pub fn encode_value_owned(value: KvValue) -> (Vec<u8>, i64) { match value { KvValue::V8(value) => (value, VALUE_ENCODING_V8), KvValue::Bytes(value) => (value, VALUE_ENCODING_BYTES), KvValue::U64(value) => { let mut buf = [0; 8]; buf.copy_from_slice(&value.to_le_bytes()); (buf.to_vec(), VALUE_ENCODING_LE64) } } }