Mastodon is a federated social network, similar to a distributed twitter. Conveniently, it provides a public firehose of all posts, which will let us see how Arroyo can be used in a real-world setting.

In this tutorial we are going to build a simple dashboard that shows the most popular hashtags across the mastodon network.

Before starting this tutorial, you will need a running Arroyo cluster. See the getting started guide to get that set up.

Creating the connection

The first thing we need to do is create a source that will read from the Mastodon firehose. We will be using the Server-Sent Events API (also known as EventSource) to read from the firehose, via this api.

Arroyo comes with an SSE connector, so we just need create a connection that points to the mastodon API. Connections in Arroyo allow queries to read and write data from external sources.

There are two ways to create a connection. We can create a re-usable connection in the Connections tab, or we can create a connection directly in SQL. For this tutorial, we’ll create the connection directly in SQL.

Navigate to the Pipelines tab (http://localhost:8000/pipelines), and click Create Pipeline.

Connection tables are created via CREATE TABLE statements that have a connector with option, which specifies which connector should be used; in this case sse (for server-sent events). Also required is format, which specifies the format of the data (in this case, json).

Then, each connector will have a set of options that may be set—for SSE, we need to set the endpoint option to tell it where to connect to. We’re also setting the events field to filter the events that we will read. You can find all of the options for the SSE connector in the connectors docs.

We can define the mastodon source connection like this:

CREATE TABLE mastodon (
    id TEXT,
    uri TEXT,
    content TEXT
) WITH (
    connector = 'sse',
    format = 'json',
    endpoint = 'http://mastodon.arroyo.dev/api/v1/streaming/public',
    events = 'update'
);

Arroyo runs a mirror of the mastodon firehose, hosted at http://mastodon.arroyo.dev/api/v1/streaming/public, which does not require authentication. However you can also use another mastodon server for which you have an auth token, in which case you would use the headers option to set the auth token, like

WITH (
  ...
  headers = 'Authorization: Bearer <auth_token>'
)

Then we can read from it, to get a sense of the data:

select * from mastodon;

Click Start Preview to run a preview pipeline over the data.

You should see something like this:

Extracting Hashtags

Next we need to extract the hashtags from the content. Conveniently, the mastodon API already extracts hash tags for us like this:

{
  ...
  "tags": [
    {
      "name": "arroyostreaming",
      "url": "https://mastodon.social/tags/arroyostreaming"
    }
  ],
}

To access the tags, we’ll need to define our source slightly differently, using the raw string format. This will give us a single column called value with the raw JSON data.

CREATE TABLE mastodon (
    value TEXT
) WITH (
    connector = 'sse',
    format = 'raw_string',
    endpoint = 'http://mastodon.arroyo.dev/api/v1/streaming/public',
    events = 'update'
);

Now we can pull out the tags using the extract_json function:

SELECT tag FROM (
    SELECT extract_json(value, '$.tags[*].name') AS tag
    FROM mastodon)
    WHERE tag IS NOT NULL;

The extract_json function takes a string value and a jsonpath expression and returns all matches in the input for the expression. In this case, it returns a list with the name of each tag in the message.

We can turn this expression into a view (in Arroyo, a way to re-use a specific bit of SQL computation) to make it easier to work with:

CREATE VIEW tags AS (
    SELECT btrim(unnest(tags), '"') as tag FROM (
        SELECT extract_json(value, '$.tags[*].name') AS tags
        FROM mastodon)
);

We’ve also added a call to unnest, which is a special operator that is able to “unroll” lists into individual records. This lets us handle all of the tags in a single message. Then we’ve wrapped that in btrim, as extract_json returns encoded strings like "tag"— btrim removes the leading and trailing "s so we just get tag.

Counting Hashtags

The next step is to find the count of each hashtag over a window of time. We’ll use a sliding window that’s 15 minutes wide (in other words, that looks back that far in the data stream) with a 5 second slide (in other words, that updates every 5 seconds). In SQL, we can introduce a sliding window with the hop function.

SELECT tag, count(*) AS count
FROM tags
GROUP BY tag, hop(interval '5 seconds', interval '15 minutes');

Finding the top tags

Finally, we can find the top tags by using a SQL window function to rank the tags by count:

SQL window functions are not the same as Arroyo windows. SQL window functions are a way to perform calculations over multiple rows of data, while Arroyo windows aggregate over time.

See here for more on SQL window functions.

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY window
        ORDER BY count DESC) as row_num
    FROM (SELECT count(*) as count,
        tag,
        hop(interval '5 seconds', interval '15 minutes') as window
            FROM tags
            group by tag, window)) WHERE row_num <= 5

This produces the top 5 tags over the last 15 minutes by filtering on the row_num column, which contains that tag’s rank within the window.

Arroyo is able to perform sophisticated optimizations over SQL queries like this. See this blog post for some of the technical details.

Visualizing the results

Now that we have our complete query:

CREATE TABLE mastodon (
    value TEXT
) WITH (
    connector = 'sse',
    format = 'raw_string',
    endpoint = 'http://mastodon.arroyo.dev/api/v1/streaming/public',
    events = 'update'
);

CREATE VIEW tags AS (
    SELECT btrim(unnest(tags), '"') as tag FROM (
        SELECT extract_json(value, '$.tags[*].name') AS tags
     FROM mastodon)
);

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY window
        ORDER BY count DESC) as row_num
    FROM (SELECT count(*) as count,
        tag,
        hop(interval '5 seconds', interval '15 minutes') as window
            FROM tags
            group by tag, window)) WHERE row_num <= 5;

We can run the pipeline for real by clicking Start Pipeline. Give it a name, then click Start to run the pipeline.