datafusion-ethers

Crates.iodatafusion-ethers
lib.rsdatafusion-ethers
version43.0.0
sourcesrc
created_at2024-05-28 14:18:28.079238
updated_at2024-11-08 23:10:17.159012
descriptionEthereum RPC bridge for Apache Datafusion
homepage
repositoryhttps://github.com/kamu-data/datafusion-ethers
max_upload_size
id1254430
size242,014
Sergii Mikhtoniuk (sergiimk)

documentation

README

Ethereum RPC Bridge for Apache Datafusion

Release CI Dependencies Chat

About

This adapter allows querying data from Ethereum-compatible blockchain nodes using SQL.

SQL queries are analyzed and optimized by Apache Datafusion engine and custom physical plan nodes translate them into Ethereum JSON-RPC calls to the node, maximally utilizing the predicate push-down. Node communications are done via alloy crate.

The crate also provides UDFs (a set of custom SQL functions) for ABI decoding.

The crate is currently designed for embedding, but the goal is also to provide an application that supports FlightSQL protocol and streaming data queries.

Warning about crates.io release

We have currently suspended publishing this crate to crates.io after migrating from ethers to alloy. This is because alloy team is not publishing their crate yet (see alloy-rs/alloy#791). Please use the git dependency for now.

Quick Start

Setup dependencies:

# Note: Alloy is still under active development and needs to be used via git
alloy = { git = "https://github.com/alloy-rs/alloy", branch = "main", features = [
    "provider-http",
    "provider-ws",
] }
datafusion = { version = "*" }
datafusion-ethers = { git = "https://github.com/kamu-data/datafusion-ethers", branch = "master" }

Initialize libraries and run queries:

// Create `alloy` RPC client
let rpc_client = ProviderBuilder::new()
    .on_builtin("http://localhost:8545")
    .await
    .unwrap();

// (Optional) Add config extension
let mut cfg = SessionConfig::new()
    .with_option_extension(datafusion_ethers::config::EthProviderConfig::default());

// Create `datafusion` session
let mut ctx = SessionContext::new_with_config(cfg);

// Register all UDFs
datafusion_ethers::udf::register_all(&mut ctx).unwrap();

// (Optional) Register JSON UDFs
datafusion_functions_json::register_all(&mut ctx).unwrap();

// Register catalog provider
ctx.register_catalog(
    "eth",
    Arc::new(datafusion_ethers::provider::EthCatalog::new(rpc_client)),
);

let df = ctx.sql("select * from eth.eth.logs limit 5").await.unwrap();
df.show().await.unwrap();

Example Queries

Get raw logs:

select *
from eth.eth.logs
where block_number between 0 and 5
limit 5
+--------------+------------------------------------------------------------------+----------------------+-------------------+------------------------------------------------------------------+-----------+------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| block_number | block_hash                                                       | block_timestamp      | transaction_index | transaction_hash                                                 | log_index | address                                  | topic0                                                           | topic1                                                           | topic2                                                           | topic3 | data                                                                                                                                                                                             |
+--------------+------------------------------------------------------------------+----------------------+-------------------+------------------------------------------------------------------+-----------+------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 3            | e5a51ad7ea21386e46be1a9f94ff96be0b5b9d4fd79a898a3aaa759d1dff6ae4 | 2024-06-07T08:14:44Z | 0                 | 944d0ecfa3e3d226b5af093570ba50d743313c0485f236a1414d4781777b5b00 | 0         | 5fbdb2315678afecb367f032d93f642f64180aa3 | d9e93ef3ac030ca8925f1725575c96d8a49bd825c0843a168225c1bb686bba67 | 000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266 | 000000000000000000000000000000000000000000000000000000000000007b |        |                                                                                                                                                                                                  |
| 3            | e5a51ad7ea21386e46be1a9f94ff96be0b5b9d4fd79a898a3aaa759d1dff6ae4 | 2024-06-07T08:14:44Z | 0                 | 944d0ecfa3e3d226b5af093570ba50d743313c0485f236a1414d4781777b5b00 | 1         | 5fbdb2315678afecb367f032d93f642f64180aa3 | da343a831f3915a0c465305afdd6b0f1c8a3c85635bb14272bf16b6de3664a51 | 0000000000000000000000005fbdb2315678afecb367f032d93f642f64180aa3 |                                                                  |        | 00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000005612d626172000000000000000000000000000000000000000000000000000000 |
| 4            | f379253db8ae7ce55c559fc8603b399caa163f95b7e4ef785f3fef50762cc9f2 | 2024-06-07T08:14:45Z | 0                 | 4da3936c231342e2855bc879c4c3a77724142c249bc15065e0c2fc0af28e8072 | 0         | e7f1725e7734ce288f8367e1bb143e90bb3f0512 | d9e93ef3ac030ca8925f1725575c96d8a49bd825c0843a168225c1bb686bba67 | 000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266 | 000000000000000000000000000000000000000000000000000000000000007b |        |                                                                                                                                                                                                  |
| 4            | f379253db8ae7ce55c559fc8603b399caa163f95b7e4ef785f3fef50762cc9f2 | 2024-06-07T08:14:45Z | 0                 | 4da3936c231342e2855bc879c4c3a77724142c249bc15065e0c2fc0af28e8072 | 1         | e7f1725e7734ce288f8367e1bb143e90bb3f0512 | da343a831f3915a0c465305afdd6b0f1c8a3c85635bb14272bf16b6de3664a51 | 000000000000000000000000e7f1725e7734ce288f8367e1bb143e90bb3f0512 |                                                                  |        | 00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000005612d626172000000000000000000000000000000000000000000000000000000 |
+--------------+------------------------------------------------------------------+----------------------+-------------------+------------------------------------------------------------------+-----------+------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+------------------------------------------------------------------+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Note: the block_time column will only be populated if the node implementation supports it, see ethereum/execution-apis#295.

Get raw logs from a specific contract address:

select *
from eth.eth.logs
where address = X'5fbdb2315678afecb367f032d93f642f64180aa3'

Get raw logs matching specific signature:

select *
from eth.eth.logs
where topic0 = eth_event_selector('MyLog(address indexed addr, uint64 indexed id)')

Decode raw log data into JSON:

select
  eth_decode_event(
    'SendRequest(uint64 indexed requestId, address indexed consumerAddr, bytes request)',
    topic0,
    topic1,
    topic2,
    topic3,
    data
  ) as event
from eth.eth.logs
+-----------------------------------------------------------------------------------------------------------------------+
| event                                                                                                                 |
+-----------------------------------------------------------------------------------------------------------------------+
| {"consumerAddr":"aabbccddaabbccddaabbccddaabbccddaabbccdd","name":"SendRequest","request":"ff00bbaa","requestId":123} |
+-----------------------------------------------------------------------------------------------------------------------+

Extract decoded event into a nicely typed table (using datafusion-functions-json crate):

select
  json_get_str(event, 'name') as name,
  json_get_int(event, 'requestId') as request_id,
  decode(json_get_str(event, 'consumerAddr'), 'hex') as consumer_addr,
  decode(json_get_str(event, 'request'), 'hex') as request
from (
  select
    eth_decode_event(
      'SendRequest(uint64 indexed requestId, address indexed consumerAddr, bytes request)',
      topic0,
      topic1,
      topic2,
      topic3,
      data
    ) as event
  from eth.eth.logs
)
+-------------+------------+------------------------------------------+----------+
| name        | request_id | consumer_addr                            | request  |
+-------------+------------+------------------------------------------+----------+
| SendRequest | 123        | aabbccddaabbccddaabbccddaabbccddaabbccdd | ff00bbaa |
+-------------+------------+------------------------------------------+----------+

Note: Currently DataFusion does not allow UDFs to produce different resulting data types depending on the arguments. Therefore we cannot analyze the event signature literal and return a corresponding nested struct from the UDF. Therefore we have to use JSON as a return value. If you would like to skip JSON and go directly to the nested struct take a look at EthDecodedLogsToArrow and EthRawAndDecodedLogsToArrow transcoders.

Supported Features

  • Table: logs
    • Can be used to access transaction log events, with schema corresponding closely to eth_getLogs RPC method
    • Supports predicate push-down on block_number, block_hash, address, topic[0-3]
    • Supports limit
  • Config options:
    • block_range_from (default earliest) - lower boundry (inclusive) restriction on block range when pushing down predicate to the node
    • block_range_to (default latest) - upper boundry (inclusive) restriction on block range when pushing down predicate to the node",
  • UDF: eth_event_selector(<solidity log signature>): hash
    • Converts Solidity log signature into a hash
    • Useful for filtering events by topic0
  • UDF: eth_(try_)decode_event(<solidity log signature>), topic0, topic1, topic2, topic3, data): json string
  • Transcoders:
    • EthRawLogsToArrow - converts raw log data into Arrow record batches
    • EthDecodedLogsToArrow - converts decoded log data into Arrow record batches
    • EthRawAndDecodedLogsToArrow - given raw log data and an event type and produces Arrow record batches with raw data columns and with decoded event columns as a nested struct
  • Utilities:
    • RawLogsStream - implements efficient and resumable pagination of eth_getLogs using a provided filter

Related Projects

  • kamu-cli - verifiable data processing toolset that can ingest data from blockchains and provide it to smart contracts as a new-generation oracle.
Commit count: 34

cargo fmt