suber

    Dark Mode
Search:
Group by:

A Pub/Sub engine.

Receives messages from multiple sources and delivers them as a serialized stream. Publisher threads do not block each other or delivery thread, and delivery does not block publishing. Messages can belong to multiple topics. Subscribers can subscribe to multiple topics. Topics, subscribers and subscriptions can be modified anytime. Delivery may be triggered on message push, size of undelivered messages, amount of undelivered messages and on call. Keeps a message cache so that subscribers can sync their state at will.

Example

# nim c -r --gc:arc --threads:on --d:release example.nim
import suber, os, random, times

let topics = ["Art", "Science", "Nim", "Fishing"]
let subscribers = ["Amy", "Bob", "Chas", "Dave"]
let messagedatas = ["Good News", "Bad News", "Breaking News", "Old News"]
let publishers = ["Helsinki", "Tallinn", "Oslo", "Edinburgh"]
let bus = newSuber[string, 4]() # string datas, max 4 topics
var stop: bool

proc generateMessages(publisher: int) =
  {.gcsafe.}:
    while true:
      if stop: break else: (sleep(100+rand(500)) ; if stop: break)
      write(stdout, $publishers[publisher] & ", "); flushFile(stdout)
      var messagetopics = initIntSet()
      for topicnumber in 1 .. 1 + rand(2): messagetopics.incl(rand(3))
      bus.push(messagetopics, messagedatas[rand(3)] & " from " & publishers[publisher])

proc deliver() =
  {.gcsafe.}:
    for i in 1 .. 10: (sleep(1000); echo ""; bus.doDelivery())
    stop = true

proc writeMessages(subscriber: int, messages: openArray[ptr SuberMessage[string]]) =
  var subscriberids: IntSet
  try:
    for message in messages:
      if subscriber > -1:
        stdout.write("@" & $message.timestamp & " to " & subscribers[subscriber] & ", ")
      else:
        stdout.write("@" & $message.timestamp & " to ")
        bus.getSubscribers(message, subscriberids)
        for subscriberid in subscriberids: stdout.write(subscribers[subscriberid] & ", ")
      stdout.write("concerning ")
      for topic in message.topics.items(): stdout.write(topics[topic.int] & " & ")
      stdout.writeLine("\b\b: " & message.data)
    stdout.flushFile()
    if subscriber > -1: echo "--"
  except: discard # write IOError
 
 proc onPull(subscriber: Subscriber, expiredtopics: openArray[Topic], messages: openArray[ptr SuberMessage[string]]) =
  {.gcsafe.}:
    if expiredtopics.len > 0: echo "Sorry, not stocking that old news anymore."
    else: writeMessages(int(subscriber), messages)

proc redeliver() =
  {.gcsafe.}:
    while true:
      if stop: break else: (sleep(2000+rand(3000)) ; if stop: break)
      let since = getMonoTime() - initDuration(milliseconds = 500 + rand(1500))
      echo "" ; echo "--" ; echo("Chas requests Nim-related news since timestamp @", since)
      let subscriber = subscribers.find("Chas")
      bus.pull(subscriber, toIntSet([topics.find("Nim")]), since)

proc onDeliver(messages: openArray[ptr SuberMessage[string]]) {.gcsafe, raises:[].} =
  {.gcsafe.}: writeMessages(-1, messages)

bus.setDeliverCallback(onDeliver)
bus.setPullCallback(onPull)
for i in 0 ..< 4: (for j in 0 .. i: bus.subscribe(i.Subscriber, j.Topic, true))

var delivererthread: Thread[void]
createThread(delivererthread, deliver)
var redelivererthread: Thread[void]
createThread(redelivererthread, redeliver)
var publisherthreads: array[4, Thread[int]]
for i in 0 ..< 4: createThread(publisherthreads[i], generateMessages, i)
joinThreads publisherthreads ; bus.stop()
joinThreads([delivererthread, redelivererthread]) ; echo ""

Types

Topic = distinct int
Subscriber = distinct int
SuberMessage[TData] = object
  topics*: IntSet            ## The `Topic`s that this message belongs to
  timestamp*: MonoTime       ## Unique timestamp that orders and identifies messages
  data*: TData               ## Message payload, a generic type
  size: int                  ## Size of the data, used to trigger size-based deliveries
  
push, pull, deliver and find -callbacks will return (pointers to) SuberMessages.
DeliverCallback[TData] = proc (messages: openArray[ptr SuberMessage[TData]]) {...}{.
    gcsafe, raises: [].}

Proc that gets called when there are messages to be delivered. Set this with setDeliverCallback.

Note that DeliverCallback must be gcsafe and not raise any exceptions.

PushCallback[TData] = proc (message: ptr SuberMessage[TData]) {...}{.gcsafe,
    raises: [].}

Proc that gets called every time when a new message is published. Set this with setPushCallback.

Note that PushCallback must be gcsafe and not raise any exceptions.

PullCallback[TData] = proc (subscriber: Subscriber;
                            expiredtopics: openArray[Topic];
                            messages: openArray[ptr SuberMessage[TData]]) {...}{.
    gcsafe, raises: [].}

Proc that is called when subscriber pulls old messages.

subscriber identifies the puller
expiredtopics is list of topics for which all messages are not anymore cached
messages includes results for topics that had all requested messages still in cache

Set this with setPullCallback.

Note that PullCallback must be gcsafe and not raise any exceptions.

Suber[TData; SuberMaxTopics] = ref object
  
The Suber service. Create one with newSuber.

TData is the generic parameter that defines the type of message data
SuberMaxTopics is maximum amount of distinct topics

Note: If you have distinct topics (subscriber partitions) and application logic allows, it may be profitable to have dedicated Suber for each partition.

Procs

proc `==`(x, y: Topic): bool {...}{.borrow.}
proc `==`(x, y: Subscriber): bool {...}{.borrow.}
proc `$`(x: Topic): string {...}{.borrow.}
proc `$`(x: Subscriber): string {...}{.borrow.}
proc toMonoTime(i: int64): MonoTime {...}{.inline, raises: [], tags: [].}
proc toIntSet(x: openArray[Topic]): IntSet {...}{.inline, raises: [], tags: [].}
proc toIntSet(x: openArray[Subscriber]): IntSet {...}{.inline, raises: [], tags: [].}
proc newSuber[TData; SuberMaxTopics: static int](cachemaxcapacity = 10000000;
    cachelength = 1000000; maxdelivery = -1; channelsize = 200): Suber[TData,
    SuberMaxTopics]
proc setPushCallback[TData](suber: Suber; onPush: PushCallback[TData])
proc setDeliverCallback[TData](suber: Suber; onDeliver: DeliverCallback[TData])
proc setPullCallback[TData](suber: Suber; onPull: PullCallback[TData])
proc stop[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics])

1: refuses to accept new messages
2: returns the processing thread for joining
3: all already accepted messages (in the channel buffer) are processed to cache (may trigger deliveries)
4: executes one final delivery, if undelivered messages exist in cache
5: processing loop stops, thread is stopped and joined with caller

proc stopImmediately[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics])
instantly stops the service
proc getChannelQueueLengths(suber: Suber): (int, int, int)
Reports amounts of buffered messages in channel queue for monitoring and backpressure purposes:

first field: Current number of messages in the channel buffer
second field: Peak number of queued messages since queue was empty
third field: Maximum number of queued messages ever

proc addTopic(suber: Suber; topic: Topic | int): bool {...}{.discardable.}

Adds new topic.

Returns false if maximum number of topics is already added.

proc removeTopic(suber: Suber; topic: Topic | int)
proc hasTopic(suber: Suber; topic: Topic | int): bool
proc getTopiccount(suber: Suber): int
proc getSubscribersbytopic(suber: Suber): seq[
    tuple[id: Topic, subscribers: IntSet]]
Reports subscribers for each topic.
proc subscribe(suber: Suber; subscriber: Subscriber; topic: Topic | int;
               createnewtopic = false): bool {...}{.discardable.}
Creates new subscription. If createnewtopic is false, the topic must already exist, otherwise it is added as needed.
proc unsubscribe(suber: Suber; subscriber: Subscriber; topic: Topic)
Removes a subscription.
proc removeSubscriber(suber: Suber; subscriber: Subscriber)
Removes all subscriptions of the subscriber.
proc getSubscriptions(suber: Suber; subscriber: Subscriber): seq[Topic]
proc getSubscribers(suber: Suber; topic: Topic | int): IntSet
proc getSubscribers(suber: Suber): IntSet
proc getSubscribers(suber: Suber; topics: openArray[Topic]): IntSet
Gets subscribers that are subscribing to any of the topics (set union).
proc getSubscribers(suber: Suber; message: ptr SuberMessage; toset: var IntSet;
                    clear = true)
Gets subscribers to given message into the toset given as parameter. If clear is true, the toset is cleared first.
proc isSubscriber(suber: Suber; subscriber: Subscriber; topic: Topic): bool
proc doDelivery[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics])
Call this to trigger a delivery (DeliverCallback). To achieve time-based delivery, call this on regular intervals.
proc push[TData](suber: Suber; topics: sink IntSet; data: sink TData;
                 size: int = 0)
Pushes new message to message cache to be delivered.

suber: the service
topics: set of topics that this message belongs to
data: message payload. Available later as the data field of SuberMessage
size: Size of the data. Required only when size-based delivery is being used.

proc push[TData](suber: Suber; topic: Topic | int; data: sink TData;
                 size: int = 0)
proc pull[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics];
                                 subscriber: Subscriber | int;
                                 topics: sink IntSet;
                                 aftertimestamp: sink MonoTime)
Requests messages after given timestamp and belonging to certain topics.

suber: service
subscriber: will be passed to callback
topics: set of topics that are of interest
aftertimestamp: only messages published after this timestamp will be pulled

proc pull[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics];
                                 subscriber: Subscriber | int;
                                 topic: Topic | int;
                                 aftertimestamp: sink MonoTime)
proc pullAll[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics];
                                    subscriber: Subscriber | int;
                                    topic: Topic | int;
                                    aftertimestamp: sink MonoTime)
proc find[TData](suber: Suber; query: MonoTime; callback: proc (query: MonoTime;
    message: ptr SuberMessage[TData]) {...}{.gcsafe, raises: [].})

Calls the callback with (pointer to) the message that has the timestamp given in query.

Important: If queried message is not cached, message will be nil. Always check for nil first.

Note that callback must be gcsafe and not raise any exceptions.

proc doSynced[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics];
                                     callback: proc () {...}{.gcsafe, raises: [].})

Calls the callback so that no other callbacks (delivery, etc.) are not being processed in parallel

Note that callback must be gcsafe and not raise any exceptions.

Converters

converter toTopic(x: int): Topic {...}{.raises: [], tags: [].}