rivulet: A redis-based message broker

rivulet
riv·u·let
/ˈriv(y)ələt/
A very small stream

rivulet is a message broker implementation on top of a raw redis connection. The entire broker is implemented in the client and does not require deployment.

Warning

rivulet is in active devlopment and has not been extensively tried and tested in production-level environments. Use at your own risk.

Quickstart

Install

$ pip install rivulet

A simple example

# create a client
c = rivulet.connect(redis_url)

# subscribe
channels = ['my-channel-0', 'my-channel-1']
c.subscribe(channels)

# write to a channel
c.write(channel_id, messages)

# read from a channel
inbox = c.read()
# inbox = {
#     "my-channel-1": [
#         {"id": 123,
#          "ts": 123465623,  # timestamp, ms since epoch
#          "src": "some-client-id",
#          "data": "the message as a string"}, ...],
#     "my-channel-2": ...
# }

# process all messages in all channels
for channel, messages in inbox:
    for message in messages:
        do_something(message['data'])

# unsusbscribe
c.unsubscribe(channels)

The rivulet client

The rivulet client

class rivulet.rivulet.Client(redis_url: str, client_id: str = None, channel_ids: List[str] = None, bufsize: int = 4096, **redis_args)[source]

A redis-based message broker.

ping() → bool[source]

Ping the backend.

Returns:True if successful.
read(message_limit: int = 512) → Dict[str, List[Dict[str, Union[int, str]]]][source]

Read available messages from all channel subscriptions.

Parameters:message_limit – Maximum number of messages to read from a topic in a single call. The maximum overall number of retrieved messages is (message_limit x number of subscribed topics)
Returns:A dictionary with subscribed topics as keys and a list of messages for every topic as values.
Raises:BackendError if there are any redis backend errors.

The messages returned for every topic are simple python dictionaries. The data sent with write() is available in the data field.

message = {
    'id': message_id,
    'ts': timestamp_in_ms_since_epoch,
    'src': client_id,
    'data': data
}
subscribe(channel_ids: List[str], index_policy: rivulet.rivulet.IndexPolicy = <IndexPolicy.CURRENT: 1>, timeout_ms: int = 1000) → None[source]

Subscribe to a list of channels.

Parameters:
  • channel_ids – A list of channels to subscribe to.
  • index_policy – The index policy for channel subscription, one of EARLIEST, CURRENT, LATEST.
  • timeout_ms – Time in milliseconds to wait for successful subscription. If subscription fails with a TimeoutError, the client is free to retry (use a backoff policy to avoid lock contention).
Raises:

TimeoutError if subscription fails withing the specified timeout_ms interval. BackendError for any other redis failures.

Index policies define how the client determines which message marks the begin of a subscription:

IndexPolicy.EARLIEST:
The subscriptions starts with the earliest message that is still available in the channel.
IndexPolicy.CURRENT:
The subscription starts with the current index of the client. The current index is stored in the redis backend. This is a helpful option to restart a subscription after the client was restarted or similar. If there is no stored index for a particular client & channel combination, CURRENT falls back to LATEST.
IndexPolicy.LATEST:
The subscription starts with the latest (i.e. most recent) message in the channel.
subscriptions

Returns the list of channels this client is subscribed to.

Returns:A list of channel names.
Return type:List[str]
Raises:BackendError, if anythong goes wrong with Redis.

Note

The implementation always actively queries the current list of channel subscription from the Redis backend, the client does not hold state and does not cache.

unsubscribe(channel_ids: List[str]) → None[source]

Unsubscribes the client from a set of channels.

After unsubscribing the client, the function removes all messages in the channel that have been read by all remaining subscribers. If there are no subscribers left the call will also delete all remaining messages and control data structures from the redis server.

Parameters:channel_ids – A list of channels the unsubscribe from. Invalid or non-existent channels will not raise an error.
write(channel_id: str, data: str, timeout_ms: int = 10000) → None[source]

Write a message to a topic.

Parameters:
  • channel_id – The name of the channel to write to.
  • data – The message data.
  • timeout_ms – (optional) User-specified write timeout (default=10s)
Raises:

TimeoutError if the message cannot be send within the specified timeout_ms interval. BackendError for any other redis failures.

The function obtains a unique (within the topic) message id from the redis server wraps the data into a message envelope and adds the message to the topic.

A message is a simple python dictionary:

message = {
    'id': message_id,
    'ts': timestamp_in_ms_since_epoch,
    'src': client_id,
    'data': data
}
class rivulet.rivulet.IndexPolicy[source]

Enum of rivulet index policies.

rivulet.rivulet.connect(redis_url: str, client_id=None, channel_ids=None, **redis_args) → rivulet.rivulet.Client[source]

Convenience function to create a rivulet client.

Redis_url:A conncetion URL for the redis backend, of the form https://[:password]@server:port/db.
Client_id:A unique identifier for the client (default=uuid).
Channel_ids:A list of channel ids to connect to (optional).
Redis_args:Additional arguments to pass on to the redis from_url() constructor.
Returns:A rivulet client object.
Raises:ConnectionError if a connection to the redis server cannot be established. BackendError if subscriptions cannot be carried out successfully.

Exceptions

Rivulet exceptions.

exception rivulet.exceptions.BackendError[source]

Signifies an error in the backend (redis).

exception rivulet.exceptions.ConnectionError[source]

Raised for errors related to redis connection handling.

exception rivulet.exceptions.RivuletError[source]

Rivulet exception base class.

exception rivulet.exceptions.TimeoutError[source]

Raised if a timeout is reached.