shors

Crates.ioshors
lib.rsshors
version0.12.1
sourcesrc
created_at2022-08-19 14:14:13.496937
updated_at2024-10-27 23:33:12.364449
descriptionTransport layer for cartridge + tarantool-module projects.
homepage
repositoryhttps://git.picodata.io/picodata/picodata/http-router
max_upload_size
id648717
size93,456
(picodata-account)

documentation

README

SHORS

Latest Version Docs badge

Shors - a library for creating transport layer for distributed systems built with tarantool-module. Shors contains four components:

HTTP

Create http route

Use a route::Builder for create http routes. After route created just register it with http Server.

Example:

use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request};
use shors::transport::Context;
use std::error::Error;

fn make_http_endpoint() {
  let endpoint = Builder::new()
          .with_method("GET")
          .with_path("/concat/a/:a/b/:b")
          .build(
            |_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
              let a = request
                      .stash
                      .get("a")
                      .map(|s| s.as_str())
                      .unwrap_or_default();
              let b = request
                      .stash
                      .get("b")
                      .map(|s| s.as_str())
                      .unwrap_or_default();

              Ok(a.to_string() + b)
            },
          );

  let s = server::Server::new();
  s.register(Box::new(endpoint));
}

A more complex example (with groups, error handling, custom and builtin middlewares):

use once_cell::sync::Lazy;
use opentelemetry::sdk::export::trace::stdout;
use opentelemetry::sdk::trace::Tracer;
use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request, Response};
use shors::transport::Context;
use shors::{middleware, shors_error};
use std::error::Error;

static OPENTELEMETRY_TRACER: Lazy<Tracer> = Lazy::new(|| stdout::new_pipeline().install_simple());

pub fn make_http_endpoints() {
  let route_group = Builder::new()
          .with_path("/v1")
          .with_middleware(|route| {
            println!("got new http request!");
            route
          })
          .with_middleware(middleware::http::otel(&OPENTELEMETRY_TRACER))
          .group();

  #[derive(serde::Deserialize)]
  struct EchoRequest {
    text: String,
  }

  let echo = route_group
          .builder()
          .with_method("POST")
          .with_path("/echo")
          .build(
            |_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
              let req: EchoRequest = request.parse()?;
              Ok(req.text)
            },
          );

  let ping = route_group
          .builder()
          .with_method("GET")
          .with_path("/ping")
          .build(|_ctx: &mut Context, _request: Request| -> Result<_, Box<dyn Error>> { Ok("pong") });

  let s = server::Server::new();
  s.register(Box::new(echo));
  s.register(Box::new(ping));
}

Add OpenAPI docs

First, add shors = { ..., features = ["open-api"]} to Cargo.toml of your project. Use .define_open_api method on route builder and define OpenAPI operation. Underline shors using utoipa crate for create OpenAPI schema. For user convenience this crate reexported as shors::utoipa.

!Important: if you use derive macros from shors::utoipa please add this line into .rs file:

use shors::utoipa as utoipa;

for correct work of a derive macros.

To access the resulting OpenAPI document use a shors::transport::http::openapi::with_open_api function. See test application routes for familiarize with examples of usage.

For usage of swagger see shors::transport::http::openapi::swagger_ui_route function.

RPC

Prepare

Rpc transport required exported stored procedure - rpc_handler.

Create stored procedure. Example (where RPC_SERVER is the Server instance):

#[no_mangle]
pub extern "C" fn rpc_handler(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
    RPC_SERVER.with(|srv| srv.handle(ctx, args))
}

And export it from cartridge role. Example:

    box.schema.func.create('mylib.rpc_handler', { language = 'C', if_not_exists = true })
    rawset(_G, 'rpc_handler', function(path, ctx, mp_request)
      return box.func['mylib.rpc_handler']:call({ path, ctx, mp_request })
    end)

Create rpc routes

Working with rpc routes same as http: use route::Builder for creating rpc routes. After route created register it with rpc Server.

Complex example:

use once_cell::unsync::Lazy;
use shors::log::RequestIdOwner;
use shors::transport::rpc::server::Server;
use shors::transport::{rpc, Context};
use std::error::Error;
use shors::{middleware, shors_error};

thread_local! {
    pub static RPC_SERVER: Lazy<Server> = Lazy::new(Server::new);
}

#[tarantool::proc]
fn init_rpc() -> tarantool::Result<()> {
  let routes = rpc::route::Builder::new()
          .with_error_handler(|ctx, err| {
            shors_error!(ctx: ctx, "rpc error {}", err);
          })
          .with_middleware(|route| {
            println!("got new rpc request!");
            route
          })
          .with_middleware(middleware::rpc::otel(&OPENTELEMETRY_TRACER))
          .group();
  
  let sum_route = routes.builder().with_path("/sum").build(
    |_ctx: &mut Context, req: rpc::Request| -> Result<_, Box<dyn Error>> {
      let numbers = req.parse::<Vec<u64>>()?;
      Ok(numbers.into_iter().sum::<u64>())
    },
  );

  RPC_SERVER.with(|srv| {
    srv.register(Box::new(sum_route));
  });

  Ok(())
}

RPC client

There is a special component for interaction with remote rpc endpoints. Currently, client can call rpc endpoint in four modes:

  • by bucket_id (vshard)
  • by bucket_id (vshard) async (without waiting for an response)
  • by replicaset id (call current master)
  • by cartridge role (call current master)
  • by cartridge role with uri (call instance by uri that can be not current master)

Prepare

The RPC client requires some lua code to be registered whether in the luaopen_ function or an init function. Note that a luaopen-function is called when a related library is properly loaded e.g. from the init.lua file or the RPC client is intentionally initialized in the init function, as shown in the following examples

Examples:

Initialization of rpc-client directly from the init method

#[proc]
fn init_rpc_client() {
  init_logger();
  let lua = unsafe { tlua::StaticLua::from_static(tarantool::lua_state().as_lua()) };
  shors::init_lua_functions(&lua)}

Defining a luaopen-function

#[no_mangle]
pub unsafe extern "C" fn luaopen_libstub(l: *mut ffi_lua::lua_State) -> c_int {
    let lua = tlua::StaticLua::from_static(l);
    shors::init_lua_functions(&lua).unwrap();
    1
}

Call rpc endpoint by bucket_id

    let lua = tarantool::lua_state();

    let params = vec![2, 3, 4];
    let resp = rpc::client::Builder::new(&lua)
        .shard_endpoint("/add")
        .call(&mut Context::background(), bucket_id, &params)?;

Call rpc endpoint by bucket_id async

    let lua = tarantool::lua_state();

    rpc::client::Builder::new(&lua)
        .async_shard_endpoint("/ping")
        .call(&mut Context::background(), bucket_id, &())?;

Call rpc endpoint by replicaset uuid

    let lua = tarantool::lua_state();

    let params = vec![2, 3, 4];
    let resp = rpc::client::Builder::new(&lua)
        .replicaset_endpoint("/add")
        .prefer_replica()
        .call(&mut Context::background(), rs_uuid, &params)?;

Call rpc endpoint by cartridge role

NOTE: calling rpc endpoint by role require register exported rpc_handler stored procedure as exported role method. For example:

return {
    role_name = 'app.roles.stub',
    ...
    rpc_handler = function(path, ctx, mp_request)
      return box.func['libstub.rpc_handle']:call({ path, ctx, mp_request })
    end,
}

Call example:

    let lua = tarantool::lua_state();

    let resp = rpc::client::Builder::new(&lua)
      .role_endpoint("app.roles.stub", "/ping")
      .call(&mut Context::background(), &())?;

Call rpc endpoint by cartridge role

NOTE: depend on rpc_handler (detail in Call rpc endpoint by cartridge role item)

    let lua = tarantool::lua_state();

    let resp = rpc::client::Builder::new(&lua)
      .role_endpoint("app.roles.stub", "/ping")
      .with_uri("localhost:3031")
      .call(&mut Context::background(), &())?;

Builtin middlewares

  • http server
    • debug - print debug information in debug logs
    • otel - opentelemetry tracing
    • otel_conditional - opentelemetry tracing, disabled if http-header with-trace not set
    • access_logs - nginx like access logs
  • rpc server
    • debug - print debug information in debug logs
    • otel - opentelemetry tracing
    • record_latency - record route latency as prometheus metric
  • rpc client
    • otel - opentelemetry tracing

    • retry - retry call on server side errors

    • record_latency - record call latency as prometheus metric

Testing

Unit

  make unit-test

Integration

  (cd tests/testapplication/ && cartridge build)
  make int-test

Request pipeline (actual for 0.1.x version)

!NOTE text bellow not actual for shors v 0.2.0+

Shors use vshard/cartridge API underline for make remote requests. Both cartridge and vshard api is a LUA api. So, look at pipeline of shors rpc request:

Client side

  • Rust side:
    1. sender create an instance of rpc::client::Client
    2. sender use rpc::client::Client::call method with rust structure that represent request payload
    3. rust structure serialize into LUA table (using tarantool::tlua - Push trait)
  • LUA side:
    1. calling vshard or cartridge method with LUA table derived from previous step
    2. vshard or cartridge api calling iproto
    3. iproto serialize LUA table into msgpack and send it into server side

Server side

  • LUA side:
    1. received message from iproto, message representation is a tarantool tuple
    2. call function-handler in rust using internal routes table
  • Rust side:
    1. Call .decode method on rpc::Request to restore the request

So serialization/deserialization scheme looks like this: rust structure -> lua table -> msgpack representation -> rust structure

There is a problem in this scheme: what if we use enum in fields of initial rust structure? For example

  #[derive(Debug, Deserialize, Serialize, tlua::Push)]
enum Value {
  String(String),
  Code(String),
}
#[derive(Debug, Deserialize, Serialize, tlua::Push)]
struct Foo {
  bar: Value,
}
let example = Foo{ bar: Value::Code("abc".to_string()) };

tarantool::tlua will serialize this struct in lua table like this:

{
  bar = "abc"
}

So as you see, information about which enum variant using is lost. In the future, when we serialize this LUA table into msgpack and then deserialize it into a rust structure, we will get a serde error. Serde expect some type information for deserializing, but there is no. In this example if LUA table looks like:

{
  bar = {"Code" = "abc"}
}

Deserialization will be success and no errors occurred.

How we can fix this? Currently most generic approach is an implement tlua::Push trait for enum. For this example (note, this is example implementation, don't create hashmap at production code):

impl<L> tlua::Push<L> for Value
  where
          L: tlua::AsLua,
{
  type Err = TuplePushError<tlua::Void, tlua::Void>;

  fn push_to_lua(&self, lua: L) -> Result<PushGuard<L>, (Self::Err, L)> {
    let mut hm = HashMap::<String, String>::new();
    match self {
      Value::String(s) => hm.insert("String".to_string(), s.to_string()),
      Value::Code(s) => hm.insert("Code".to_string(), s.to_string()),
    };
    hm.push_to_lua(lua)
  }
}

impl<L> tlua::PushOne<L> for Value where L: tlua::AsLua {}
Commit count: 0

cargo fmt