Collection Broker
The role of the collection broker is to facilitate blocking commands on collections in Garnet.
A blocking command is a command that returns immediately if an item in a collection is available or blocks the client until an item is avilable (see: BLPOP, BLMOVE etc).
The broker runs its main loop on a dedicated Task whenever there are active clients waiting on collection items.
Logical Flow
Incoming blocking command:
- A client sends a blocking command, the command handler in turn calls
CollectionItemBroker.GetCollectionItemAsync
- If the broker's main loop is not running, it will start running and wait on the next event in its event queue (
brokerEventsQueue
). - A new
CollectionItemObserver
object is created and an event of typeNewObserverEvent
is pushed into the event queue. - The command handler awaits on one of two occurrences:
- A semaphore signal coming from the main loop to notify an item has been found.
- A timeout specified by the client has been reached.
- If the broker's main loop is not running, it will start running and wait on the next event in its event queue (
Incoming "releasing" command:
- A client inserts an item into a collection the command handler in turn calls
CollectionItemBroker.HandleCollectionUpdate
\- If the collection is not observed by and clients, nothing to do.
- Otherwise, a new event of type
CollectionUpdatedEvent
is pushed into the event queue.
Main broker loop:
- The main loop (
CollectionItemBroker.Start
) continuously listens on anAsyncQueue
for new incoming events and synchronously callsHandleBrokerEvent
for each new event.\- For events of type
NewObserverEvent
,InitializeObserver
is called.InitializeObserver
takes an array of keys and checks the collection values for available item in the order in which they where specified by the client. If an item is found, the observer is updated which sets the result and signals the semaphore to release the awaiting thread. If no item is found, the observer is being added to each key's observer queue. - For events of type
CollectionUpdatedEvent
,TryAssignItemFromKey
is called. This method gets the key's observer queue and tries to assign the next available item from the collection stored at key to the next observer. If an available item was indeed found, the observer is updated which sets the result and signals the semaphore to release the awaiting thread.
- For events of type
Disposed sessions:
- If a
RespServerSession
that has an active observer is disposed, itsDispose
method will callCollectionItemBroker.HandleSessionDisposed
, which in turn will update the observer and signal the semaphore awaiter to stop. Once the observer's status is changed toSessionDisposed
, the main loop will not assign it any item.