Transport Service
The distributed hash table service need a reliable transport mechanism for two reasons. It’s important not to lose parts of the has table in transit. Moreover, for consistency, we rely on the fact that we never have two values for the same key on the system. This invariant could be violated if messages were duplicated.
We will build a non-duplicating ordered transport service. The fact that message delivery is ordered isn’t needed for the sharded hash table protocol, but it makes it easier to implement reliable non-duplicating transmission.
Specification
Here is the interface of the transport service:
module sht_transport(lower,req,shard,seq_num,id) = {
relation requested(D:id,R:req)
relation replied(D:id,R:req)
relation delegated(D:id,S:shard)
action send_request(src:id,dst:id,rq:req) returns (ok:bool)
action send_delegate(src:id,dst:id,s:shard) returns (ok:bool)
action send_reply(src:id, dst:id, rq:req) returns (ok:bool)
action recv_request(dst:id,rq:req)
action recv_reply(dst:id,rq:req)
action recv_delegate(dst:id,s:shard)
...
}
The lower
parameter of the module is the low-level network
interface, an unreliable, duplicating message sevice. Parameters req
and shard
are data types for message contents, seq_num
is a type
of packet sequence numbers and id
is a type of process id’s.
The service has specific calls and call-backs for the three types of
messages in the protocol: request, reply and delegate. The send
actions
return a Boolean ok
to indicate the message was successfully enqueued.
This allows for the possibility that the message queues are bounded.
There are three corresponding relations in the abstract state:
requested
, replied
and delegated
. Each indicates that a message
of a given type is in transit to a certain destination. The type req
is used for both request and reply messages, while the delegate
message contains a shard.
The service specification has three parts relating to the three types of messages. Here is the specification for request messages:
object spec = {
after init {
requested(D,R) := false
}
before send_request {
assert ~requested(dst,rq)
}
after send_request {
if ok {
requested(dst,rq) := true
}
}
before recv_request {
assert requested(dst,rq);
requested(dst,rq) := false
}
...
The specifications for reply and delegate messages are similar. The specification does not allow a meesage to be sent if it is already in transit. In other words, we cannot have two identical messages in the network. This might at first seem odd. That is, there is no reason we cannot have to outstanding request for the same data. Request operations are unique, however, because each contains a unique transaction id as a ghost field.
Notice that send_request
only records a request in transit
if the return value ok
is true. A request can only be received if it
is in transit, and when it is received is removed from the set of
requests in transit.
The specifications of the reply and delegation actions are similar, except for the data types.
Implementation
To implement the transport service, we need a concrete data type for messages. We’ll us a tagged union with four tags:
type mtype = {request_t, reply_t, delegate_t, ack_t}
The first three are for our three types of protocol messages and the
last, ack_t
is for acknowledgment packets. Here is the struct that
holds the messages:
object net_msg = {
type t = struct {
mty : mtype,
src : id,
num : seq_num.t,
rq : req,
sh : shard
}
}
In has a tag mty
, a source id, a sequence number and two payload
fields: a req
and a shard
. This is a little bit wasteful of
network bandwidth, since only one of these two fields is actually
used. When Ivy has built-in support for tagged union types, this waste
can be eliminated. Since we are using a concrete datatype for the
messages, the IVy compiler can generate serializers and
deserializers for messages. We don’t have to write them from
scratch.
Now let’s look at the code. We will describe the implementation in the parameterized style, with one object for each process id. We’ll call this object the host. Each host has set of outgoing message queues, one per destination host, and each queue has its own timer for re-transmission:
object impl(me:id) = {
instance mq(D:id) : message_queue(net_msg,seq_num)
instance timer(D:id) : timeout_sec
In addition, for each destination, we record send_seq
, the next
sequence number to use, and for each message source we record
recv_seq
the next sequence number to receive. Initially, both are zero:
individual send_seq(S:id) : seq_num.t
individual recv_seq(S:id) : seq_num.t
init recv_seq(S) = 0 & send_seq(S) = 0
Sending
Here is the code that sends requests:
implement send_request(dst:id,rq:req) {
local msg : net_msg.t, seq : seq_num.t {
net_msg.mty(msg) := request_t;
net_msg.src(msg) := me;
net_msg.rq(msg) := rq;
net_msg.num(msg) := send_seq(dst);
send_seq(dst) := seq_num.next(send_seq(dst));
ok := mq(dst).enqueue(msg);
if ok {
call lower.send(me,dst,msg)
}
}
}
It starts by constructing packet msg
by filling in the fields. The
sequence number of the packet is send_seq
for the specified
destination. Then send_seq
is incremented and we call the message
queue mq(dst)
to enqueue the message. Finally, if the message was
successfully enqueued, we call into the low-level network interface to
actually send the message. Notice that the source parameter is me
, the
id of the host.
Sending messages for replies and delegations is similar.
Receiving
To receive messages, we implement the recv
action of the low-level
network interface. Here is the code:
implement lower.recv(msg:net_msg.t) {
local src:id,seq:seq_num.t {
seq := net_msg.num(msg);
src := net_msg.src(msg);
if seq <= recv_seq(src) & net_msg.mty(msg) ~= ack_t {
local ack : net_msg.t {
net_msg.mty(ack) := ack_t;
net_msg.src(ack) := me;
net_msg.num(ack) := seq;
call lower.send(me,src,ack)
}
};
if net_msg.mty(msg) = ack_t {
call mq(src).delete_all(seq)
}
else if seq = recv_seq(src) {
recv_seq(src) := seq_num.next(recv_seq(src));
if net_msg.mty(msg) = request_t {
call recv_request(me,net_msg.rq(msg))
}
else if net_msg.mty(msg) = reply_t {
call recv_reply(me,net_msg.rq(msg))
}
else if net_msg.mty(msg) = delegate_t {
call recv_delegate(me,net_msg.sh(msg))
}
}
}
}
Several things are going on here. First we deal with acknowledgments.
We acknowledge a message if its sequence number is less than or equal to
the expected sequence number recv_seq
. The idea here is that all packets
with lesser sequence numbers have already been received. We have to
acknowledge the packet because the eariler acknowledgment might have been
lost. A greater sequence number will be ignored, since we receive
messages strictly in order.
Next, if the messages is an acknowledgment, we know that all the messages up to the sequence number of the ack have been received (again, because reception is ordered). So we delete all the outgoing messages in the queue with sequence numbers up to the acknowledged one.
Then we handle payloads. If the message has sequence number
recv_seq
(the one we are expecting to receive) we increment
recv_seq
and process the message. We split cases on the message tag
and call the appropriate recv
action with the appropriate payload
field.
Timeouts
When a message queue times out, we may need to retransmit a
message. We do this by implementing the timeout
action of the timer
interface, like this:
implement timer.timeout(dst:id) {
if ~mq(dst).empty {
call lower.send(me,dst,mq(dst).pick_one)
}
}
Notice that because the timer is a parameterized object, the timeout
action as a parameter corresponding to the destination host id. We
check to see if the corresponding queue is empty. If not, we call
pick_one
to select a message to re-transmit, and then call the
low-level network service to send the message.
Proof
The proof of this implementation requires quite a few invariants, partly because we need similar invariants requests, replies and delagations.
Message queue invariants
First, we need to relate the message queue contents to the abstract state relations that indicate which messages are in transit. For requests, an enqueued message that is not yet received must correspond to a request in transit:
conjecture mq(D).contents(M) & impl(D).recv_seq(me) <= net_msg.num(M)
& net_msg.mty(M) = request_t -> requested(D,net_msg.rq(M))
Moreover, we can’t have unreceived duplicate messages in the network. This is a bit subtle. We disallow two identical messages in the same queue (i.e., with distinct sequence numbers). We also have to disallow a message in transit to the same destination from two distinct sources, since the abstract state does not distinguish sources:
conjecture impl(S1).mq(D).contents(M1) & impl(D).recv_seq(S1) <= net_msg.num(M1)
& impl(S2).mq(D).contents(M2) & impl(D).recv_seq(S2) <= net_msg.num(M2)
& net_msg.mty(M1) = request_t & net_msg.mty(M2) = request_t
& (S1 ~= S2 | net_msg.num(M1) ~= net_msg.num(M2))
-> net_msg.rq(M1) ~= net_msg.rq(M2)
The above is a bit redundant. It might be better style to define a relation describing an unreceived request message in a given queue. We have a similar invariants for replies and delegations.
To make sure we don’t create duplicate sequence numbers in the queues, we need to say that now that the sending sequence number is bigger than any message in the queue:
conjecture mq(D).contents(M) -> ~(send_seq(D) <= net_msg.num(M))
No sequence number occurs twice in a queue (this is actually an invariant of message queues and could have been stated in their implementation):
conjecture mq(D).contents(M1) & mq(D).contents(M2) & M1 ~= M2
-> net_msg.num(M1) ~= net_msg.num(M2)
We also need to know that only appropriate messages are enqueued, that is, messages in a queue have a correct source field and are not acknowledgments:
conjecture mq(D).contents(M) -> net_msg.src(M) = me & net_msg.mty(M) ~= ack_t
Low-level network invariants
We need to relate the messages in transit in the low-level network to the implemention state. For non-acknowledgment messages, we need to know three things:
First, a message intransit must match any queue entry with the same sequence number:
conjecture lower.spec.sent(M,D) & net_msg.src(M) = me
& mq(D).contents(M2) & net_msg.num(M2) = net_msg.num(M)
& net_msg.mty(M) ~= ack_t -> M = M2
Second, a low-level message that hasn’t been received yet must still be in the corresponding queue:
conjecture lower.spec.sent(M,D) & net_msg.src(M) = S
& impl(D).recv_seq(S) <= net_msg.num(M) & net_msg.mty(M) ~= ack_t
-> impl(S).mq(D).contents(M)
Third, every low-level message as actually been sent:
conjecture lower.spec.sent(M,D) & net_msg.src(M) = me & net_msg.mty(M) ~= ack_t
-> ~(send_seq(D) <= net_msg.num(M))
Taken together, these properties say that the unreceived low-level messages are a subset of the messages in the appropriate outgoing queue.
Finally, we need to know that low-level acknowledgment packets are correct. This means that any acknowledged sequence number must actually have been received:
conjecture lower.spec.sent(M,D) & net_msg.src(M) = S
& net_msg.mty(M) = ack_t -> ~(impl(S).recv_seq(D) <= net_msg.num(M))
Together, these invariants are inductive and are sufficient to show the implementation is correct in the sense of delivering each sent protocol message no more than once.
The message queues
We aqlso need to verify the mesage queues agains their specification. We will use parameter stripping to do that:
isolate iso_mq(mq_me:id) = mq(mq_me) with seq_num
What this means is that we verify one message queue instance named
mq_me
in isolation from the others. This works because the different
message queues don’t interfere with each other.
Testing
Before implementing the application-level protcol on top of this service, it’s worth instantiating and proving it, and perhaps also testing it a bit. Here is a test module:
include trans
include seqnum
include udp
type id
type req
type shard
instance seq_num : sequence_numbers
instance t : sht_transport(u,req,shard,seq_num,id)
instance u : udp_simple(id,t.net_msg.t)
isolate iso_t = t.impl with t,u,seq_num
export t.send_request
export t.send_delegate
import t.recv_request
import t.recv_delegate
We left the req
and shard
types uninterpreted, since these don’t
matter to the transport service. We use udp_simple
as the low-level
datagram service. Let’s verify this module:
ivy_check trans_test.ivy
Checking isolate iso_t...
trying ext:t.impl.timer.timeout...
checking consecution...
trying ext:t.send_delegate...
checking consecution...
trying ext:t.send_request...
checking consecution...
trying ext:u.recv...
checking consecution...
Checking isolate seq_num.iso...
trying ext:seq_num.iter.create...
checking consecution...
trying ext:seq_num.iter.end...
checking consecution...
trying ext:seq_num.next...
checking consecution...
Checking isolate t.impl.iso_mq...
trying ext:t.impl.mq.delete_all...
checking consecution...
trying ext:t.impl.mq.empty...
checking consecution...
trying ext:t.impl.mq.enqueue...
checking consecution...
trying ext:t.impl.mq.pick_one...
checking consecution...
OK
Notice the IVy actually verified three isolates: iso_t
(the
transport module), seq_num.iso
(the sequence number module) and
t.impl.iso_mq
(the parameter-stripped message queue module).
Compile and run
Let’s compile to a REPL and try a few packets:
$ make -B trans_test
ivy_to_cpp target=repl isolate=iso_impl trans_test.ivy
g++ -g -o trans_test trans_test.cpp
./trans_test
> t.send_request(0,1,42)
true
> t.recv_request(1,42)
t.send_delegate(1,0,66)
true
> t.recv_delegate(0,66)
...
Notice the order of events. We call send_request
, which responds with true
meaning the pack was sent. Then asynchronously the packet arrives, which results
in a callback of recv_request
. Also notice we entered integers for message
payloads. The compiler uses machine integers to implement the uninterpreted types,
though logically any type would work.
As an excercise, you might try stripping the first parameter to generate a parallel implementation. Check that if you send a message to a process that hasn’t started yet, the packet is retried until that process starts.