Mastodon is a federated social network, similar to a distributed twitter. Conveniently, it provides a public firehose of all posts, which will let us see how Arroyo can be used in a real-world setting.

In this tutorial we are going to build a simple dashboard that shows the most popular hashtags across the mastodon network.

Before starting this tutorial, you will need a running Arroyo cluster. See the getting started guide to get that set up.

Creating the connection

The first thing we need to do is create a source that will read from the Mastodon firehose. We will be using the Server-Sent Events API (also known as EventSource) to read from the firehose, via this api.

Arroyo comes with an SSE connector, so we just need create a connection that points to the mastodon API. Connections in Arroyo allow queries to read and write data from external sources.

There are two ways to create a connection. We can create a re-usable connection in the Connections tab, or we can create a connection directly in SQL. For this tutorial, we’ll create the connection directly in SQL.

Navigate to the Pipelines tab (http://localhost:8000/pipelines), and click Create Pipeline.

Connection tables are created via CREATE TABLE statements that have a connector with option, which specifies which connector should be used; in this case sse (for server-sent events). Also required is format, which specifies the format of the data (in this case, json).

Then, each connector will have a set of options that may be set—for SSE, we need to set the endpoint option to tell it where to connect to. We’re also setting the events field to filter the events that we will read. You can find all of the options for the SSE connector in the connectors docs.

We can define the mastodon source connection like this:

CREATE TABLE mastodon (
    id TEXT,
    uri TEXT,
    content TEXT
) WITH (
    connector = 'sse',
    format = 'json',
    endpoint = 'https://mstdn.party/api/v1/streaming/public',
    events = 'update'
);

Then we can read from it, to get a sense of the data:

select * from mastodon;

Click Start Preview to run a preview pipeline over the data.

You should see something like this:

Extracting Hashtags

Next we need to extract the hashtags from the content. Conveniently, the mastodon API already extracts hash tags for us like this:

{
  ...
  "tags": [
    {
      "name": "arroyostreaming",
      "url": "https://mstdn.party/tags/arroyostreaming"
    }
  ],
}

To access the tags, we’ll need to define our source slightly differently, using the raw string format. This will give us a single column called value with the raw JSON data.

CREATE TABLE mastodon (
    value TEXT
) WITH (
    connector = 'sse',
    format = 'raw_string',
    endpoint = 'https://mstdn.party/api/v1/streaming/public',
    events = 'update'
);

Now we can pull out the tags using the extract_json_string function:

SELECT tag FROM (
    SELECT extract_json_string(value, '$.tags[*].name') AS tag
    FROM mastodon)
    WHERE tag IS NOT NULL;

The extract_json_string function, when applied to an array like tags will return the first value as a string. In the future Arroyo will support an unnest function that will allow creating multiple rows from a single row, but for now we’ll just consider the first hashtag.

Counting Hashtags

The next step is to find the count of each hashtag over a window of time. We’ll use a sliding window that’s 15 minutes wide (in other words, that looks back that far in the data stream) with a 5 second slide (in other words, that updates every 5 seconds). In SQL, we can introduce a sliding window with the hop function.

We’ll also turn our tag query into a view to make it easier to work with:

CREATE VIEW tags AS (
    SELECT tag FROM (
        SELECT extract_json_string(value, '$.tags[*].name') AS tag
        FROM mastodon)
    WHERE tag is not null
);

SELECT tag, count(*) AS count
FROM tags
GROUP BY tag, hop(interval '5 seconds', interval '15 minutes');

Finding the top tags

Finally, we can find the top tags by using a SQL window function to rank the tags by count:

SQL window functions are not the same as Arroyo windows. SQL window functions are a way to perform calculations over multiple rows of data, while Arroyo windows aggregate over time.

See here for more on SQL window functions.

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY window
        ORDER BY count DESC) as row_num
    FROM (SELECT count(*) as count,
        tag,
        hop(interval '5 seconds', interval '15 minutes') as window
            FROM tags
            group by tag, window)) WHERE row_num <= 5

This produces the top 5 tags over the last 15 minutes by filtering on the row_num column, which contains that tag’s rank within the window.

Arroyo is able to perform sophisticated optimizations over SQL queries like this. See this blog post for some of the technical details.

Visualizing the results

Now that we have our complete query:

CREATE TABLE mastodon (
    value TEXT
) WITH (
    connector = 'sse',
    format = 'raw_string',
    endpoint = 'https://mstdn.party/api/v1/streaming/public',
    events = 'update'
);

CREATE VIEW tags AS (
    SELECT tag FROM (
        SELECT extract_json_string(value, '$.tags[*].name') AS tag
     FROM mastodon)
    WHERE tag is not null
);

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY window
        ORDER BY count DESC) as row_num
    FROM (SELECT count(*) as count,
        tag,
        hop(interval '5 seconds', interval '15 minutes') as window
            FROM tags
            group by tag, window)) WHERE row_num <= 5;

We can run the pipeline for real by clicking Start Pipeline. Give it a name, then click Start to run the pipeline.