Now we get to the actual protocol. Each host has a hash table and a delegation map describing where to find ranges of keys. When the host receives a request to get or set the value of a key in the directory, it looks up the key in the delegation map. If the key is delegated to the local host, the operation is performed directly on the host’s hash table. Otherwise, the request is forwarded to the host indicated in the delegation map. Since the delegation map of the local host may be out of date, the forwarded request may be forwarded repeatedly until it reaches the host with actual responsibility for the key. When this host receives the forwarded request, it performs the operation on its local hash table and returns a reply directly to the original requesting host. This host then responds to the client with an answer.

Initially, all keys belong to the master server numbered 0. A host that serves a range of keys can delegate that range to another host by extracting a shard and sending it in a delegate message. The host receiving the delegate message incorporates the shard into its own hash table. Both hosts update their delegation maps accordingly. Hosts don’t make any attempt to advertise the keys that they own to other hosts. This means that a request for a key must follow the original chain of delegation to get to the owner of the key.

Specification

Here is the client-facing interface of the protocol:

module sht_protocol(me,ref,trans,id,key,data,shard) = {

    action set(k:key.t,d:data) returns(ok:bool, tx:ref.txid)
    action get(k:key.t) returns(ok:bool, tx:ref.txid)
    action answer(k:key.t,d:data,tx:ref.txid)
    action delegate_(dst:id, lo:key.iter.t, hi:key.iter.t)  returns(ok:bool)
    ...

The parameters are me, the host’s id, ref, the reference object, trans the transport service and data types id, key, data and shard (data is the type of values in the directory).

Notice that set and get return a parameter of type ref.txid, or transaction id. This is a ghost type defined by the reference object that is used to define linearizability. Transaction id’s don’t actually appear in the compiled implementation. When the protcol calls back to answer, it provides a ghost transaction id that allows us to specify the correct data.

Here is the specification of this interface:

    object spec = {
        before set {
            tx := ref.begin(me,ref.write,k,d)
        }
        before get {
            tx := ref.begin(me,ref.read,k,0)
        }
        before answer {
            assert ref.data_(tx) = d
            call ref.end(tx)
        }
        before delegate_ {
            assert dst ~= me;
            assert lo < hi;
            assert key.iter.between(lo,K,hi) -> impl.dm.map(K,me)
        }
    }

The reference object is used to provide the specification of the interface actions. In particular, calls to set and get begin new transactions. The reference object’s action ref.begin creates a new transaction and provides the transaction id. Notice that the transaction id is actually assigned in the specification monitor. Normally specification monitors only make assertions about return values. The actual return values are provided by the implementation. However in the case of ghost values it is normal for the specification itself to provide them.

The call-back answer ends the transaction and asserts that the data it provides is correct for that transaction according to the reference object. Notice that this is a before specification because answer is a call-back. We expect the protocol to guarantee this assertion before calling answer. If this is true, then we know that there exists a linearization of the transactions that is consistent with the answers we provide.

The specification of delegate_ is interesting because it has no post-condition. This is because delegate_ actually has no visible effect to the user of the interface (other than, hopefully, on performance). The pre-condition says we cannot delegate to ourselves, we have to delegate at least one key and we must own all the keys that are delegated. There is a slight violation of abstraction here, since we refer to the abstract state of the delegation map, which is internal. It would be better form to make a copy of this map and describe the way that delegate_ updates it. This way users of the interface wouldn’t have to peek inside the implementation to prove the pre-condition. For now, though, we’ll be lazy and not do that, since this pre-condition is just an environment assumption that we won’t actually prove.

Implementation

The implementation is, of course, the interesting part. We start by defining a concrete struct for requests that we can pass over the network:

object req = {
    type t = struct {
        src : id,
        rkey : key.t,
        rtype : ref.otype,
        rdata : data,
        rtxid : ref.txid
    }
}

The src field allows the reply to be routed back to the source. The key, operation type and data describe the requested operation. The transaction id is ghost and is used only for specification purposes.

The implementation of a host contains a hash table and a delegation map:

object impl = {

    instance hash : hash_table(key,data,shard)

    instance dm : delegation_map(key,id)
    ...

Here is the internal action that handles a request, either locally, or by forwarding it:

    action handle_request(rq:req.t) returns(ok:bool) = {
        local src:id, k:key.t, op:ref.otype, d:data, tx:ref.txid, ow:id {
            src := req.src(rq);
            k := req.rkey(rq);
            op := req.rtype(rq);
            d := req.rdata(rq);
            tx := req.rtxid(rq);
            ow := dm.get(k);
            if ow = me {
                call ref.commit(tx);  # this is ghost!
                if op = ref.read {
                    req.rdata(rq) := hash.get(k)
                }
                else {
                    call hash.set(k,d)
                };
                ok := trans.send_reply(me, src, rq)
            } else {
                ok := trans.send_request(me, ow, rq)  # forward request
            }
        }
    }

We start by extracting the fields of the request. Then we call the delegation map dm to see who (we think) owns the request key. If the owner is me (the local host) we call ref.commit to indicate that the transaction is being commited now. This is possible because the transaction id is a ghost field of the request. In the compiled code, the reference object will be abstracted away, so this call will do nothing. Then we execute the request. If it’s a read, we get data from the hash table. If a write, we set data. The we send the modified request back to the original requester as a reply. On the other hand, if the owner is not me we forward the request to the owner. In either case, to send a protocol message, we call the trans service.

Now we use handle_request to implement set and get. Here is set:

    implement set {
        local rq:req.t {
            req.src(rq) := me;
            req.rkey(rq) := k;
            req.rtype(rq) := ref.write;
            req.rdata(rq) := d;
            req.rtxid(rq) := tx;       # ghost!
            ok := handle_request(rq)
        }
    }

It just builds a request and calls handle_request. Notice though, that the transaction id generated by the specification monitor (which runs before the implementation) is inserted in the request. The implementaiton of get is similar:

    implement get {
        local rq:req.t {
            req.src(rq) := me;
            req.rkey(rq) := k;
            req.rtype(rq) := ref.read;
            req.rtxid(rq) := tx;
            ok := handle_request(rq)
        }
    }

The delegate_ action is quite simple:

    implement delegate_ {
        call dm.set(lo,hi,dst);
        ok := trans.send_delegate(me,dst,hash.extract_(lo,hi))
    }

We set the range of keys in the delegation map, extract a shard and call the transport service to send the shard.

Now we have to handle the incoming messages by implementing the call-back actions of the transport service. Incoming forward request are handled like this:

    implement trans.recv_request(rq:req.t) {
        local ok : bool {
            ok := handle_request(rq)
        }
    }

That is, we just call handle_request. We ignore the ok flag returned by handle_request. This means that if we don’t have resources to handle the request, the request just gets dropped. A good exercise would be to and a return parameter to trans.recv_request so that in case the request can’t be served immediately, the packet will not be acknowledged and will then be retried. This won’t happen with the current implementation of queue, however, which doesn’t put a limit on the number of enqueued messages (in practice this means that if the queue exceeds the available menory resources, the server process will stop).

Here is how we handle incomping replies:

    implement trans.recv_reply(rq:req.t) {
        call answer(req.rkey(rq),req.rdata(rq),req.rtxid(rq))
    }

That is, we just call back to answer with the information in the reply (including thre ghost transaction id).

Finally, we handle incoming delegation messages like this:

    implement trans.recv_delegate(s:shard.t) {
        call dm.set(shard.lo(s),shard.hi(s),me);
        call hash.incorporate(s)
    }

That is, we add key range of the shard to our delegation map, indicating the local host as owner. Then we incorporate the shard into our hash table.

That’s it for the protocol.

Invariants

A fair number of invariants are needed in the proof of the protocol to cover the various kinds of protocol messages that can be in transit and to make sure that the global protocol state is consistent.

The global protocol state invariants are:

    # If I own this key, then my hash table data matches the reference object
conjecture impl.dm.map(K,me) -> hash.hash(K) = ref.map(K)

    # If I own this key, then no one else does
conjecture impl.dm.map(K,me) & X ~= me -> ~proto(X).impl.dm.map(K,X)

The second invarient refers to proto, which is a collection of all hosts. This will be defined when we instantiate the protocol.

The invariants for delegation message in transit are:

    # If I own this key, then no delegated shard does
    conjecture proto.impl.dm(me).map(K,me)
        -> ~(trans.delegated(X,S) & key.iter.between(shard.lo(S),K,shard.hi(S)))

    # No two delegated shards have keys in common
    conjecture trans.delegated(X,S) & key.iter.between(shard.lo(S),K,shard.hi(S))
               & trans.delegated(X1,S1) & key.iter.between(shard.lo(S1),K,shard.hi(S1))
               -> X = X1 & S = S1

    # Delegated shards have correct data relative to reference object
    conjecture trans.delegated(X,S) & key.iter.between(shard.lo(S),K,shard.hi(S))
                -> shard.value(S,K) = ref.map(K)

    # Every shard in transit is valid
    conjecture trans.delegated(D,S) -> shard.valid(S)

Notice we state that data in a shard in transit have to reflect all commited transactions. This key invariant holds because data in transit is owned by no one and thus can’t be modified.

For forwarded requests, we have:

    # Forwarded requests have correct operations relative to the reference object
    conjecture trans.requested(D,R) & L = req.rtxid(R)->
               (req.rkey(R) = ref.key_(L) &
                req.rtype(R) = ref.type_(L) &
                (req.rtype(R) = ref.write -> req.rdata(R) = ref.data_(L)))

    # All forwarded requests have been generated but not committed
    conjecture trans.requested(D,R) -> ref.generated(req.rtxid(R)) & ~ref.committed(req.rtxid(R))

    # No two forwarded requests with the same txid
    conjecture trans.requested(D1,R1) & trans.requested(D2,R2) & req.rtxid(R1) = req.rtxid(R2)
               -> D1 = D2 & R1 = R2

Notice how we use the reference object to specify correctness of data in flight. This is possible because of the ghost transaction id’s in the messages.

For forwarded replies, we have similar invariants:

    # Forwarded replies have correct operations relative to the reference
    conjecture trans.replied(D,R) & L = req.rtxid(R)->
               (req.rkey(R) = ref.key_(L) &
                req.rtype(R) = ref.type_(L) &
                req.rdata(R) = ref.data_(L))

    # All forwarded replies have been generated and committed
    conjecture trans.replied(D,R) -> ref.generated(req.rtxid(R)) & ref.committed(req.rtxid(R))

    # No two forwarded replies with the same txid
    conjecture trans.replied(D1,R1) & trans.replied(D2,R2) & req.rtxid(R1) = req.rtxid(R2)
               -> D1 = D2 & R1 = R2

Notice that replies differ from requests in that they represent committed transactions.

These invariants are inductive and imply that the protocol satisfies its service specification. We’ll prove that in the next section.