WebSocket is an API that enables streaming communication between clients and servers. It is a common choice for real-time web applications. Arroyo provides a WebSocket source that allows you to read from WebSocket APIs in your pipeline.

WebSocket is a very flexible protocol, and does not have standard semantics for tracking progress or restarting from a past point in time. As a result, the WebSocket source may currently miss messages on recovery.

Various APIs may also require some level of negotiation before the WebSocket connection becomes active. The WebSocket source currently supports sending a single subscription message on connection, which is sufficient to work with many real-world APIs.

If you’d like to use the WebSocket source with an API that requires additional features, reach out to the team on Discord.

Configuring the Connection

Websocket sources can be created via the Web UI or directly in SQL.

WebSocket creation flow

A WebSocket source has several required and optional fields:

FieldDescriptionRequiredExample
endpointThe endpoint for the WebSocket serverYeswss://api.example.com/v1/events
subscription_messageA message that will be sent to the server after connectionNo{"type": "subscribe", "channel": "events"}

For example, in SQL:

CREATE TABLE coinbase (
    type TEXT,
    price TEXT
) WITH (
    connector = 'websocket',
    endpoint = 'wss://ws-feed.exchange.coinbase.com',
    subscription_message = '{
      "type": "subscribe",
      "product_ids": [
        "BTC-USD"
      ],
      "channels": ["ticker"]
    }',
    format = 'json'
);