Real-time analytics on the Mastodon firehose
Create Pipeline
.
Connection tables are created via CREATE TABLE
statements that have a
connector
with option, which specifies which connector should be used; in
this case sse
(for server-sent events). Also required is format
, which
specifies the format of the data (in this case, json
).
Then, each connector will have a set of options that may be set—for SSE, we need
to set the endpoint
option to tell it where to connect to. We’re also setting
the events
field to filter the events that we will read. You can find all of the
options for the SSE connector in the connectors docs.
We can define the mastodon source connection like this:
http://mastodon.arroyo.dev/api/v1/streaming/public
, which does not require
authentication. However you can also use another mastodon server for which
you have an auth token, in which case you would use the headers
option to set
the auth token, likeStart Preview
to run a preview pipeline over the data.
You should see something like this:
value
with the raw JSON data.
extract_json
function:
extract_json
function takes a string value and
a jsonpath expression and returns all matches in the input for the
expression. In this case, it returns a list with the name of each tag in the message.
We can turn this expression into a view (in Arroyo, a way to re-use a specific bit of SQL computation) to make it
easier to work with:
unnest
, which is a
special operator that is able to “unroll” lists into individual records. This lets us handle all of the tags
in a single message. Then we’ve wrapped that in btrim
, as extract_json returns encoded strings like "tag"
—
btrim removes the leading and trailing "
s so we just get tag
.
hop
function.
row_num
column, which
contains that tag’s rank within the window.
Launch
. Give it a name,
then click Start
to run the pipeline.