# Using WebSockets

WARNING

This section is still experimental, and the interface needs more refinement.

As the blockchain's state gets updated with each new block, you may find it desirable to continuously monitor for specific events that fall within a criteria, and to be notified as they occur, without the need to manually periodically poll for state changes, which leads to wasted network resources. Such a program is called a listener, and can be implemented with Jigu by specifying the following:

  1. The Tendermint event to subscribe to

  2. A query to further filter events by content

  3. An event handler, a function you write that gets passed the event data

You can write listeners atop the Jigu WebSocket API, which is a wrapper to the Tendermint RPC interface of your node. To do so, you need to be permitted to access a Terra node's RPC endpoint's /websocket endpoint. Once you've defined your listener, you will pass it in run_listener to start subscribing and handling events.

# Creating Listeners

A listener works through creating a persistent connection to a node through WebSockets, subscribing to a specific Tendermint event and streaming the results passed through a filtering query. Fortunately, Jigu handles the necessary WebSocket connections by providing several abstractions you can take advantage of, explained below. You will find all the necessary functions for defining your own listeners through your Terra instance and in the jigu.listener package.

# Transaction Listeners

A common event of interest is the inclusion of a new transaction whose message events content matches the predicates specified in a query. Potential applications include writing applications that react to specific types of transactions when they occur, or code pathways that get triggered when certain a transaction from specific accounts have been made. These make for monitoring apps that respond in real-time to blockchain app-level state changes.

You can subscribe to transaction events by creating a transaction listener. To do so, you will define a function called an event handler that will get called when the node detects that a transaction matching your query has been included in a new block. Your handler will get run for each transaction that gets matched, in the sequence they are found.

It is important to define your query parameter as your subscription can be halted by the node if your query leads to accepting too many messages, as you will get an error that you are unable to stream in data quickly enough.

For simple applications, you can create a transaction listener by directly writing your transaction event handler, which takes 2 parameters: listener and tx_info. By using the Terra.tx_listener decorator, supplying your query, the Terra instance will automatically generate a TxListener object that once run, will pass your handler the TxListener instance and the TxInfo transaction info context once it encounters transactions that match your query.

from jigu.listener import run_listeners

n = 0

@terra.tx_listener({
    "message.sender": "terra...cyzd"
})
def alert_when_cyzd(listener: TxListener, tx_info: TxInfo):
    messages = tx_info.msgs.events.message
    for m in messages:
        print(f"{m.module[0]} - {m.action[0]}")
        n += 1
    if n > 10:
        listener.stop_listening()

@terra.tx_listener({
    "message.action": "delegate"
})
def detect_delegations(listener: TxListener, tx_info: TxInfo):
    print("A delegation event occured! Yay!")

# run both listeners simultaneously
run_listeners(alert_when_cyzd, detect_delegations)

If you want more fine-grained control regarding how you want your listener client to act, you can subclass TxListener. This allows you to write an error handler that responds to RpcErrors which get raised during the connection, such as when your connection gets cancelled due to not being able to pull in data fast enough. This gives you a chance to renew your subscription and continue polling for transactions that match a nonspecific query.

You will need to instantiate your TxListener by passing in the Terra instance that you wish to connect to.

from jigu.listener import TxListener, run_listeners

class PrevoteListener(TxListener):

    query = {
        "message.action": "exchangerateprevote"
    }

    def on_tx(self, tx_info: TxInfo):
        """Pretty print the transaction information."""
        tx_info._pp

    def on_error(self, err: RpcError):
        """Resubscribe if we encounter an error."""
        self.resubscribe()

run_listeners(PrevoteListener(terra), detect_delegations)

NOTE

Inside on_error, you can also use: .ignore() (which ignores the error and does nothing), or .unsubscribe() (which cancels the subscription). You can use .stop_listening() inside either on_tx or on_error, which disconnects and halts the listener.

# Block Listeners

A block listener subscribes to new blocks and receives updates as soon as they are finalized, which you can further filter with your query. You can create block listeners similarly to how you would define transaction listeners. However, instead of being passed a TxInfo object, you are passed block_ctx, which is a JiguBox with the following keys:

Block
block

The information of the block.

EventsQuery
events.begin_block

Events that occured at the beginning of the block (emitted during the BeginBlock ABCI call).

EventsQuery
events.end_block

Events that occured at the end of the block (emitted during the EndBlock ABCI call).

The following code shows 2 ways of creating a block listener, through function decoration and inheriting from BlockListener. Note that BlockListener also allows you to define on_error like TxListener.

from jigu.listener import BlockListener, run_listeners

@terra.block_listener() # no query necessary
def new_block(listener, block_ctx):
    block = block_ctx.block
    print(f"Block #{block.height} has {block.numtxs} txs!")

class ReportEndBlock(BlockListener):

    query = {} # no query (not necessary)

    def on_block(self, block_ctx):
        # get the end block event types
        print(block_ctx.events.end_block.types)
        self.stop_listening()

run_listeners(ReportEndBlock(terra), new_block)

Block listeners will use the block-transforming and TX-fetching settings that its connected Terra instance been configured with. Learn more about those here.

# Custom Listeners

You may have noticed that TxListener and BlockListener share many structural similarities. In fact, they are both modest customizations on top of the base class TendermintEventListener, subscribing to the Tx and NewBlock Tendermint events, respectively.

Jigu has provided you with easy ways to define transaction and block listeners, but you can easily write listeners for other Tendermint events, for more advanced usecases such as realtime consensus visualizers. To do so, you will need to inherit from TendermintEventListener, and override properties event, and methods on_message, and optionally on_error.

from jigu.listener import TendermintEventListener, run_listeners

class VoteListener(TendermintEventListener):

    event = "Vote" # Tendermint Event Type
    query = {}

    def on_message(self, msg: dict):
        """Called when you get a JSON-RPC message."""

    def on_error(self, err: RpcError):
        """Called when you run into an RpcError."""

run_listeners(VoteListener(terra)) # pass in your Terra instance

When the event listener is started, it automatically contacts the node's RPC WebSocket endpoint to attempt to acquire a subscription to the event with the given query applied.

Once started, your listener should start receiving relevant events and begin processing them through the on_message handler, which is where you should perform your custom parsing and processing. You will need to process them from their raw JSON-RPC responses.

Here are some links where you can find more information about Tendermint events.

# Query Syntax

Depending on the type of listener you are creating (i.e. which Tendermint event you are listening to, like Tx or NewBlock, etc.), there are different criteria you can filter your subscription with. You can use the name of event that was emitted, and one of its attributes to define a key, and compare that against a value.

For instance, the message handler for MsgExchangeRateVote emits a vote event with denom as an attribute, so you can use vote.denom = 'ukrw' to filter transactions/blocks that contain exchange rate votes for KRT.

There are three ways in which you can define a query. They are based on the query syntax used by Tendermint for PubSub, avaiable here. In short, they logical conjunctions (i.e. joined by AND) of simple predicates for the operators =, <, <=, >, >=, CONTAINS (value contains substring), and EXISTS (key exists).

You do not need to supply the tm.event='...' portion of the query as that is defined by the event variable of the listener class.

# With Query Builder

The Query Builder is accessible with jigu.listener.Q, and helps you construct valid Tendermint queries. You can join them together using the & operator.

from jigu.listener import Q

query = Q("message.action").eq("multisend") & Q("message.sender").contains("terra...")

The following methods are defined:

Operation Method
= .eq(x)
< .lt(x)
<= .le(x)
> .gt(x)
>= .ge(x)
EXISTS .exists
CONTAINS .contains(x)

# With a string

You can use a valid Tendermint query string.

query = "message.action = 'multisend' AND message.sender CONTAINS 'terra...'"

# With a dictionary

Testing specific keys for exact equality is the most common operation for most listeners. For simple queries requiring only equality (=), you can use a dictionary, which defines a query where the keys equal the value.

query = {
    "message.action": "multisend",
    "message.sender": "terra..." # not contains, but exact equality
}

The previous queries, which use CONTAINS, cannot be represented with using this method.

Updated on: 3/10/2020, 5:07:35 PM