Zephyr: Empowering Mercury Users with Custom Extensions to our Ingestion.

Welcome to the documentation for Zephyr, a code execution environment designed to enhance the capabilities of Mercury, the data indexer built by xyclooLabs designed to fit all the needs of developers and users of the Stellar Network, and where Soroban is a first-class citizen.

Overview

Zephyr is a Virtual Machine developed on top of WebAssembly. This technology enables the execution of WASM modules within Mercury's ingestion flow, providing seamless access to the Stellar network's data stored in Mercury's database and ledger metadata.

The decision to build on WebAssembly stems from its proven efficiency, safety features developed in Rust, and its ability to offer a secure execution environment for users and infrastructure alike.

Zephyr's integration with Mercury unlocks a plethora of possibilities for the Stellar ecosystem. Users can now build specialized services without the need for setup or infrastructure. From protocol-centered services to user-specific applications, the Mercury + Zephyr combination opens up a world of innovative use cases.

Some use cases that are currently only partially possible* are:

  • Advanced Alert Systems: Empower traders and arbitrageurs to build highly customized alert and trading strategies without the complexities of managing databases or running instances.
  • Trackers: Effortlessly create watcher services to track the movement of funds across the Stellar network.
  • Multi-step Workflows: Facilitate complex processes by enabling workflows where each step depends on the result of the previous one.
  • Customized Indexing: Tailor database structures to specific querying needs with Zephyr's ingestion mechanisms.
  • User-defined Data Aggregations: Define personalized aggregation functions and calculations for unique requirements.
  • On-the-fly Subscriptions: Dynamically create subscriptions for specific data, allowing for real-time monitoring.
  • Custom Data Retention Policies: Empower users to manage data retention based on custom policies, optimizing costs in the long run.
  • Protocol Health Checks: Easily deploy watcher programs to monitor and maintain the health of protocols within the Stellar ecosystem.

*The latest release of the Zephyr VM and its integration within Mercury is still very experimental and a lot of key features are still missing (websockets, custom querying, more advanced DB access, making subscriptions, conditional triggering and others).


If this looks interesting enough and you whish to try it out, proceed to the next section to setup your projects to work with Zephyr.

Concepts SDK API Definition

Below there are some important concepts about the ZephyrVM integration. We recommend reading this section before getting hands-on with the tutorial in the next sections.

Runs for every ledger

Zephyr programs run sequentially for every new ledger that is closed in the network. This means that once your program is deployed it will be run for every new ledger close since the moment of deployment. This is at least until we introduce conditional execution that will make Zephyr less expensive.

Before you deploy

Before you deploy your program and expect for it to start working immediately, it's important to know that you are going to need to correctly create all the tables that your program accesses.

In fact the general workflow for working with Zephyr is:

  1. write the program and take note of the tables and columns you access.
  2. create these tables with the columns in the database through the CLI.
  3. deploy the program.

If you deploy the program without having first created the correct tables through the CLI the execution of the program will always exit unsuccessfully, and currently there isn't a loggin infra available to users that makes this easy to detect.

Symbols inherited from Soroban

For ease of implementation all table names and table columns are pretty much Soroban symbols. This means that they undergo all of the constraints that Soroban symbols have. They cannot exceed 9 characters and valid characters are a-zA-Z0-9_. This is an efficiency-driven decision.

Since unlike the Soroban VM we are using WASM's multivalue feature, we're considering extending the lenght of a symbol but it's not currently implemented.

Slice-based communication

Currently we haven't implemented any Zephyr types yet, so when you write and read from the database, you're in charge of serializing/deserializing the contents to/from a bytes slice. This will become clearer once you take a look at the next sections.

Environemnt

The guest program, i.e your program will access the database and ledger close metas thorugh the Env object exported by the SDK.

This object has currently the following functions:

  • db_write(&self, table_name: &str, columns: &[&str], segments: &[&[u8]]) -> Result<(), SdkError> which writes to the DB's table table the specified columns with the specified segments of data (always as byte slices).

  • db_read(&self, table_name: &str, columns: &[&str]) -> Result<TableRows, SdkError> which reads from the DB's table table the specified columns. This returns a TableRows object which wraps all the rows and columns:

#![allow(unused)]
fn main() {
// Current implementation treats these as named structs, but could change.
#[derive(Clone, Deserialize, Serialize)]
pub struct TableRows {
    pub rows: Vec<TableRow>,
}

#[derive(Clone, Deserialize, Serialize)]
pub struct TableRow {
    pub row: Vec<TypeWrap>,
}

#[derive(Clone, Deserialize, Serialize)]
pub struct TypeWrap(pub Vec<u8>);
}

As you can see TablesRows wraps the rows, and each TableRow wraps each requested column. The value in each column for each row is TypeWrap, so a vectory of bytes.

  • reader(&mut self) -> MetaReader returns the MetaReader object, which should help you in getting sections of the ledger meta more easily. However, the current implementation of the MetaReader is very limited. If you're searching for a part of the metadata that isn't easily extracted through the reader then use the last_ledger_meta_xdr() function.

  • last_ledger_meta_xdr(&mut self) -> &stellar_xdr::next::LedgerCloseMeta which returns the whole LedgerCloseMeta object (from Stellar's XDR definition).

Zephyr's DB Access

Of course, Zephyr-executed programs have strict limitations on database writes and reads. Mainly access limitations for now. Only zephyr tables (created through the CLI) created by your account are accessible. Mercury built-in tables (contract events, ledger entries, payments, etc) will never be able to be written by Zephyr and are currently not able to be read (though we plan on enabling this once we write the authorization part for this).

Querying Zephyr Tables

Zephyr tables your create and access don't actually show up with the name you give them in the database. Rather they are a md5 hash of the table name you provide and your Mercury user id. This means that when querying your Zephyr table through GraphQL you don't query the table name rather the hash.

This is how the hash is generated:

#![allow(unused)]
fn main() {
let id = {
    let value = host.get_host_id();
    byte_utils::i64_to_bytes(value)
};

let write_point_hash: [u8; 16] = {
    let point_raw = stack.first().ok_or(HostError::NoValOnStack)?;
    let point_bytes = byte_utils::i64_to_bytes(*point_raw);

    md5::compute([point_bytes, id].concat()).into()
};
}

Basically, you compute the hash of the i64 repr of the table name symbol concatenated with your user id. Anyways, the CLI will output the actual table name queryable from GraphQL once you deploy the table.

Setup your Project

Note: this section assumes that you have already gone through at least the first steps of the Soroban's setup. If you haven't, you'll need to install rust and add wasm32-unknown-unknown as target.

Install the Zephyr CLI

The Zephyr CLI will be needed to upload your programs and create the tables these will access.

cargo install zephyr-cli

Initialize the project

First, you need to create a new cargo library:

cargo new --lib zephyr-hello-ledger 

Add Zephyr SDK as Dependency

Next, you'll need to add the zephyr sdk to your dependencies. This will enable you to easily access the environment without directly communicating with it or with shared linear memory.

[package]
name = "zephyr-hello-ledger"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rs-zephyr-sdk = { path="../zephyr/rs-zephyr-sdk" }

[lib]
crate-type = ["cdylib"]

[profile.release]
opt-level = "z"
overflow-checks = true
debug = 0
strip = "symbols"
debug-assertions = false
panic = "abort"
codegen-units = 1
lto = true

Also, we've set cdylib as crate type to produce a dynamic library and set some release flags in order not to produce md-large binaries.

Write a Hello Ledger Program

Now it's time to write a very simple zephyr program that for every ledger that passes will write to a table ledgers the ledger sequence and the amount of transactions in the ledger's tx processing.

Entry point

The Zephyr environment will automatically only call a on_close() -> () function. If such function isn't exported by the WASM module there will be no execution.

#![allow(unused)]
fn main() {
#[no_mangle]
pub extern "C" fn on_close() {}
}

We disable mangling and tell the compiler this function has a c interface.

Getting ledger number and tx processing count

Next up is accessing the ledger meta to read the ledger sequence and number of transactions in the ledger's transaction processing section.

#![allow(unused)]
fn main() {
let mut env = EnvClient::default();
let reader = env.reader();

let sequence = reader.ledger_sequence();
let processing = reader.tx_processing();
let processing_length = processing.len();
}

Note that the reader is currently very incomplete and there are high chances that you'll have to deal with the hole ledger meta object yourself. In such cases, the usage of let meta = env.last_ledger_meta_xdr() is recommended.

Writing to the database

Lastly, we want to write the sequence and the processing to the database's ledgers table:

#![allow(unused)]
fn main() {
env.db_write("ledgers", 
    &[
        "sequence", 
        "proc"
    ], 
    &[
        &sequence.to_be_bytes(), 
        &processing_length.to_be_bytes()]
    ).unwrap();
}

Note that we're transforming the sequence and processing lenght to an array of bytes. This is needed as currently the only type we've defined to work with Zephyr host <> guest communication is raw bytes. As SDK development goes further there will be more types (Arrays, Strings, Numbers, etc) that can be sent.

Summary

In the end, our Zephyr program should look like this:

#![allow(unused)]
fn main() {
use rs_zephyr_sdk::EnvClient;


#[no_mangle]
pub extern "C" fn on_close() {
    let mut env = EnvClient::default();
    let reader = env.reader();

    let sequence = reader.ledger_sequence();
    let processing = reader.tx_processing();
    let processing_length = processing.len();

    env.db_write("ledgers", 
    &[
        "sequence", 
        "proc"
    ], 
    &[
        &sequence.to_be_bytes(), 
        &processing_length.to_be_bytes()]
    ).unwrap();
}
}

Compiling

To compile the program to WASM:

cargo +nightly rustc --release --target=wasm32-unknown-unknown -- -C target-feature=+multivalue

As you can see, we're targeting a WASM release and we're also enabling WASM's multivalue compilation. Multivalue is used in Zephyr for efficiency of host <> guest interop.

This should compile the program to target/wasm32-unknown-unknown/release/zephyr_hello_ledger.wasm.

If you wish to optimize the program size, you can also use:

wasm-opt -Oz -o ./target/wasm32-unknown-unknown/release/zephyr_hello_ledger.optimized.wasm  ./target/wasm32-unknown-unknown/release/zephyr_hello_ledger.wasm --enable-multivalue

The last step is to crete the ledgers table and upload the program. See the next chapter.

Zephyr CLI: Create table and Upload

Create the ledgers table

zephyr --jwt $JWT_TOKEN new-table --name "ledgers" --columns 'sequence' 'proc'

[+] Table "zephyr_d625b7bb470ff3fe8cd1351a1cbb7187" created successfully

Note that table name and columns must abide to Soroban's short symbol rules. In fact they cannot exceed 9 characters and valid characters are a-zA-Z0-9_. This is an efficiency-driven decision. We are also considering extending the lenght using multivalue but it's not currently implemented.

The above command will create the ledgers table (pertinent to the user specified by the jwt token) with columns sequence and proc.

Upload

Only after having created all the needed tables (with correct columns) you can upload your program:

zephyr --jwt $JWT_TOKEN deploy --wasm ./target/wasm32-unknown-unknown/release/zephyr_hello_ledger.optimized.wasm

Query

Now I can query all rows in my zephyr table:

query MyQuery {
  allZephyrD625B7Bb470Ff3Fe8Cd1351A1Cbb7187S {
    edges {
      node {
        sequence
        proc
      }
    }
  }
}
{
  "data": {
    "allZephyrD625B7Bb470Ff3Fe8Cd1351A1Cbb7187S": {
      "edges": [
        {
          "node": {
            "sequence": "\\x000280d3",
            "proc": "\\x00000002"
          }
        },
        {"more": "nodes"}
      ]
    }
  }
}

Examples

Below are some of the examples we've written as of now. If you've built something using zephyr and want it to appear here, we'd love for you to open an issue for it here.

These can be found also on the Github repo.

Network SAC Contracts

This program monitors the chain for every time a Stellar Asset contract is wrapped, enabling Soroban usage of that token.

It also pushes to a table the total amount of deployed SACs by ledger, allowing to potentially see the trend in the growth of wrapped assets.

This table is public and can be queried as follows through the Mercury endpoint:

query SACTrend {
  allZephyr263707E6Cf91Cece33596A294D312A5Ds {
    edges {
      node {
        sequence
        number
      }
    }
  }
}

query AllSACs {
  allZephyrB8978D969636A31Fe5E95Efe542A1536S {
    edges {
      node {
        contract
        asset
      }
    }
  }
}
#![allow(unused)]
fn main() {
use rs_zephyr_sdk::{
    stellar_xdr::next::{
        ContractIdPreimage, FeeBumpTransactionInnerTx, HostFunction, LedgerKey, Limits,
        OperationBody, ScAddress, TransactionEnvelope, TransactionExt, TransactionResultMeta,
        TransactionResultResult, TransactionV1Envelope, WriteXdr,
    },
    EnvClient,
};

struct CreatedSAC {
    contract_id: [u8; 32],
    asset: Vec<u8>,
}

#[no_mangle]
pub extern "C" fn on_close() {
    let mut env = EnvClient::default();
    let reader = env.reader();

    let envelopes = reader.envelopes();
    let processing = reader.tx_processing();
    let current_ledger = reader.ledger_sequence();

    let mut created = Vec::new();

    for (idx, envelope) in envelopes.iter().enumerate() {
        match envelope {
            TransactionEnvelope::Tx(tx) => write_from_v1(idx, tx, &processing, &mut created),

            // v0 txs cannot inlcude soroban data
            TransactionEnvelope::TxV0(_) => (),

            TransactionEnvelope::TxFeeBump(tx) => match &tx.tx.inner_tx {
                FeeBumpTransactionInnerTx::Tx(tx) => {
                    write_from_v1(idx, &tx, &processing, &mut created)
                }
            },
        }
    }

    // add all created SACs to the database.
    for sac in &created {
        env.db_write(
            "sacs",
            &["contract", "asset"],
            &[&sac.contract_id, &sac.asset],
        )
        .unwrap();
    }

    // if no SAC was deployed add new record into historical trend
    let num_created = created.len() as i64;
    if num_created > 0 {
        let previous_sacs = env.db_read("sac_count", &["number"]);
        if let Ok(rows) = previous_sacs {
            if let Some(last) = rows.rows.last() {
                let mut byte_array: [u8; 8] = [0; 8];
                let int = &last.row[0].0;
                byte_array.copy_from_slice(&int[..int.len()]);
                let tot_sacs = i64::from_be_bytes(byte_array) + num_created;

                env.db_write(
                    "sac_count",
                    &["sequence", "number"],
                    &[&current_ledger.to_be_bytes(), &tot_sacs.to_be_bytes()],
                )
                .unwrap()
            } else {
                env.db_write(
                    "sac_count",
                    &["sequence", "number"],
                    &[&current_ledger.to_be_bytes(), &num_created.to_be_bytes()],
                )
                .unwrap()
            }
        }
    }
}

fn write_from_v1(
    idx: usize,
    tx: &TransactionV1Envelope,
    processing: &Vec<TransactionResultMeta>,
    created: &mut Vec<CreatedSAC>,
) {
    match &tx.tx.operations.get(0).unwrap().body {
        // we search for create SAC operations
        OperationBody::InvokeHostFunction(op) => {
            if let HostFunction::CreateContract(create_contract) = &op.host_function {
                if let ContractIdPreimage::Asset(asset) = &create_contract.contract_id_preimage {
                    let matching_processing = processing.get(idx).unwrap();

                    // we make sure that the tx was successful
                    if let TransactionResultResult::TxSuccess(_) =
                        matching_processing.result.result.result
                    {
                        if let TransactionExt::V1(soroban) = &tx.tx.ext {
                            if let LedgerKey::ContractData(data) =
                                &soroban.resources.footprint.read_write[0]
                            {
                                if let ScAddress::Contract(contract) = &data.contract {
                                    created.push(CreatedSAC {
                                        contract_id: contract.0,
                                        asset: asset.to_xdr(Limits::none()).unwrap(),
                                    });
                                }
                            }
                        }
                    }
                }
            }
        }

        _ => (),
    }
}
}

Track SAC Events

Note: this program relies on the previous SAC tracker program.

This program tracks all events of the SACs registered from the previous program.

Currently deployed version

This program is already live on Zephyr and is tracking events for SACs wrapped in and after ledger 177442 and its data can be accessed as follows:

query SACEvents {
  allZephyr86882807C5E507349D54F6F33Fc8229As {
    edges {
      node {
        sequence
        contract
        topic1
        topic2
        topic3
        topic4
        data
      }
    }
  }
}
{
  "data": {
    "allZephyr86882807C5E507349D54F6F33Fc8229As": {
      "edges": [
        {
          "node": {
            "sequence": "\\x0002b522",
            "contract": "\\xc184ad97f64befba0907b3cef6b570155c16658fcef92df221147d1592faa3e0",
            "topic1": "\\x0000000f000000046d696e74",
            "topic2": "\\x000000120000000000000000d08a167b577b96595971d6884e6a3097affb238ca5947585ead11660fa23676f",
            "topic3": "\\x00000012000000000000000047447cded9fa966bd551e683c1d39d5e9b32361f1a6483c15382f7684751bea0",
            "topic4": "\\x0000000e0000003c54454d3a47444949554654334b35355a4d574b5a4f484c495154544b47434c3237365a445253535a49354d46354c49524d594832454e545737374248",
            "data": "\\x0000000a00000000000000000000000000000190"
          }
        }
      ]
    }
  }
}

Remember that all this data is hex encoded bytes. topicn and data should be parsed as ScVals (ScVal::from_xdr(...)), contract is the inner Hash(<this is contract>) so you can easily build the corresponding string using the stellar-strkey libs. sequence is the big endian bytes repr for an i64. For example you can parse it in js as follows

const hex = "\\x0002b522";
const cleanHex = hex.replace(/\\x/g, '');
const result = parseInt(cleanHex, 16);

console.log(result);

Code

#![allow(unused)]
fn main() {
use rs_zephyr_sdk::{
    stellar_xdr::next::{ContractEventBody, Limits, TransactionMeta, WriteXdr},
    EnvClient,
};

#[no_mangle]
pub extern "C" fn on_close() {
    let mut env = EnvClient::default();
    let reader = env.reader();

    let sequence = reader.ledger_sequence();
    let processing = reader.tx_processing();

    let sacs = env.db_read("sacs", &["contract"]).unwrap();
    let tracked_deployed_sacs: Vec<&Vec<u8>> = sacs.rows.iter().map(|row| &row.row[0].0).collect();

    for tx_processing in processing {
        if let TransactionMeta::V3(meta) = &tx_processing.tx_apply_processing {
            if let Some(soroban) = &meta.soroban_meta {
                if !soroban.events.is_empty() {
                    for event in soroban.events.iter() {
                        let contract_id = event.contract_id.as_ref().unwrap().0;
                        if tracked_deployed_sacs.contains(&contract_id.to_vec().as_ref()) {
                            let (topics, data) = match &event.body {
                                ContractEventBody::V0(v0) => (
                                    v0.topics
                                        .iter()
                                        .map(|topic| topic.to_xdr(Limits::none()).unwrap())
                                        .collect::<Vec<Vec<u8>>>(),
                                    v0.data.to_xdr(Limits::none()).unwrap(),
                                ),
                            };
                            env.db_write(
                                "sac_event",
                                &[
                                    "sequence", "contract", "topic1", "topic2", "topic3", "topic4",
                                    "data",
                                ],
                                &[
                                    &sequence.to_be_bytes(),
                                    &contract_id,
                                    &topics.get(0).unwrap_or(&vec![]),
                                    &topics.get(1).unwrap_or(&vec![]),
                                    &topics.get(2).unwrap_or(&vec![]),
                                    &topics.get(3).unwrap_or(&vec![]),
                                    &data,
                                ],
                            )
                            .unwrap()
                        }
                    }
                }
            }
        }
    }
}
}