9 Stateful Data Types

Oz provides a set of stateful data types. These include ports, objects, arrays, and dictionaries (hash tables). These data types are abstract in the sense that they are characterized only by the set of operations performed on the members of the type. Their implementation is always hidden, and in fact different implementations exist but their corresponding behavior remains the same. For example, objects are implemented in a totally different way depending on the optimization level of the compiler. Each member is always unique by conceptually tagging it with an Oz-name upon creation. A member is created by an explicit creation operation. A type test operation always exists. In addition, a member ceases to exist when it is no longer accessible.

9.1 Ports

Port is such an abstract data-type. A Port P is an asynchronous communication channel that can be shared among several senders. A port has a stream associated with it. The operation: {Port.new S ?P} creates a port P and initially connects it to the variable S taking the role of a stream. The operation: {Port.send P M} will append the message M to the end of the stream associated with P. The port keeps track of the end of the stream as its next insertion point. The operation {IsPort P ?B} checks whether P is a port. In order to protect the stream S from being bound by mistake S is actually a future. The following program shows a simple example using ports:

declare S P
P = {Port.new S}
{Browse S} 

{Port.send P 1}
{Port.send P 2}

If you enter the above statements incrementally you will observe that S gets incrementally more defined.

S
1| 
1|2|_

Ports are more expressive abstractions than pure stream communication, which was discussed in Section 8.2, since they can be shared among multiple threads, and can be embedded in other data structures. Ports are the main message passing mechanism between threads in Oz.

9.2 Client-Server Communication

Ports are used as a communication entry point to servers. The program shown in Figure 9.1 defines a thread that acts as FIFO queue server. It has two ports, one for inserting items to the queue using put, and the other for fetch items out of the queue using get. The use of single-assignment (logic) variables makes the server insensitive to the relative arrival order of get and put requests. get requests can arrive even when the queue is empty. A server is created by {NewQueueServer ?Q}. This procedure returns back a record Q with features put and get each holding a unary procedure. A client thread having access to Q can request services by invoking these procedure. Notice how results are returned back through logic variables. A client requesting an Item in the queue will call {Q.get I}. The server will eventually answer back by binding I to an item.


declare 
fun {NewQueueServer}
   Given GivePort={Port.new Given}
   Taken TakePort={Port.new Taken}
in 
   Given = Taken
   queue(put:proc {$ X} {Port.send GivePort X} end 
         get:proc {$ X} {Port.send TakePort X} end)
end

Figure 9.1: Concurrent Queue server, first attempt


Try the following sequence of statements. The program will not work. So, what is the problem?

declare 
thread Q = {NewQueueServer} end 
{Q.put 1}
{Browse {Q.get $}}
{Browse {Q.get $}}
{Browse {Q.get $}}
{Q.put 2}
{Q.put 3}

The problem is that Given = Taken is trying to impose equality between two futures. Remember that Given and Taken are futures that can only be read and cannot be bound. So the thread corresponding to the queue server will suspend in the statement Given = Taken. This problem is remedied by running this statement in its own thread as shown in Figure 9.2 1.

The program works as follows. {Q.put I0} {Q.put I1} ... {Q.put In} will incrementally add the elements I0 I1 ... In to the stream Given, resulting in I0|I1|...|In|<Future1>. {Q.get X0} {Q.put X1} ... {Q.put Xn} will add the elements X0 X1 ... Xn to the stream Taken resulting in X0|X1|...|Xn|<Future2>. The equality constraint Given = Taken will bind Xi's to Ii's.


declare 
fun {NewQueueServer}
   Given GivePort={Port.new Given}
   Taken TakePort={Port.new Taken}
in 
   thread Given=Taken end 
   queue(put:proc {$ X} {Port.send GivePort X} end 
         get:proc {$ X} {Port.send TakePort X} end)
end

Figure 9.2: Concurrent Queue server


Warning: The code above is correct but, due to a limitation in the current Oz implementation, leaks memory. As a workaround, one can use the code below as a drop-in replacement.

declare 
fun {NewQueueServer}
   Given GivePort={Port.new Given}
   Taken TakePort={Port.new Taken}
in 
   thread  
      for X in Given Y in Taken do 
         X=Y
      end 
   end 
   queue(put:proc {$ X} {Port.send GivePort X} end 
         get:proc {$ X} {Port.send TakePort X} end)
end

9.3 Chunks

Ports are actually stateful data structures. A port keeps a local state internally tracking the end of its associated stream. Oz provides two primitive devices to construct abstract stateful data-types chunks and cells. All others subtypes of chunks can be defined in terms of chunks and cells.

A chunk is similar to a record except that the label of a chunk is an oz-name, and there is no arity operation available on chunks. This means one can hide certain components of a chunk if the feature of the component is an oz-name that is visible only (by lexical scoping) to user-defined operations on the chunk.

A chunk is created by the procedure {NewChunk Record}. This creates a chunk with the same components as the record, but having a unique label. The following program creates a chunk.

local X in 
   X={NewChunk f(c:3 a:1 b:2)}
   {Browse X}
   {Browse X.c}
end

This will display the following.

<Ch>(a:1 b:2 c:3)
3

Warning: As a syntactic convenience, one can equate an expression E at an expression position with a variable X = E, and use X to refer to the value of the expression. Using this notation the above program could be written as

local X in 
   {Browse X={NewChunk f(c:3 a:1 b:2)}}
   {Browse X.c}
end

In Figure 9.3, we show an example of using the information hiding ability of chunks to implement Ports.

9.4 Cells

A cell could be seen as a chunk with a mutable single component. A cell is created as follows.

{NewCell X ?C}

A cell is created with the initial content X. C is bound to a cell. The Table 9.1 shows the operations on a cell.


Operation

Description

{NewCell X ?C}

Creates a cell C with content X

X=@C

Returns the content of C in X

C:=Y

Modifies the content of C to Y

{IsCell +C}

Tests if C is a cell

{Exchange +C X Y}

Swaps atomically the content of C from X to Y

Table 9.1: Cell operations


Check the following program. The last statement increments the cell by one. If we leave out thread ... end the program deadlocks. Do you know why?

local I O X in 
   I = {NewCell a} {Browse @I}
   {Assign I b}    {Browse @I}
   {Assign I X}    {Browse @I}
   X = 5*5          
   {Exchange I O thread O+end} {Browse @I}
end

Cells and higher-order iterators allow conventional assignment-based programming in Oz. The following program accumulates in the cell J the value of \sum_{i=1}^10 i.

declare J in 
J = {NewCell 0}
{For 1 10 1
   proc {$ I}
      O N in 
      {Exchange J O N}
      N = O+I
   end}
{Browse @J}

Ports described in Section 9.1 can be implemented by chunks and cells in a secure way, i.e. as an abstract data type that cannot be forged. The program in Figure 9.3 shows an implementation of Ports. Initially an Oz-name is created locally, which is accessible only by the Port operations. A port is created as a chunk that has one component, which is a cell. The cell is initialized to the stream associated with the port. The type test IsPort is done by checking the feature Port. Sending a message to a port results in updating the stream atomically, and updating the cell to point to the tail of the stream.


declare Port in 
local 
   PortTag = {NewName} %New Oz name
   fun {NewPort S}
      C = {NewCell S} in 
      {NewChunk port(PortTag:C)}
   end 
   fun {IsPort ?P}
      {Chunk.hasFeature P PortTag} %Checks a chunk feature
   end 
   proc {Send P M}
      Ms Mr in 
      {Exchange P.PortTag Ms Mr}
      Ms = M|Mr
   end 
in Port = port(new:NewPort
               is:IsPort
               send:Send)
end

Figure 9.3: Implementation of Ports by Cells and Chunks


The implementation in Figure 9.3 does not protect the stream of the port. Protection of the stream is done using a future as follows.


declare Port in 
local 
   PortTag = {NewName} %New Oz name
   fun {NewPort FS}
      S C = {NewCell S} in 
      FS = !!S  % Create a future
      {NewChunk port(PortTag:C)}
   end 
   fun {IsPort ?P}
      {Chunk.hasFeature P PortTag} %Checks a chunk feature
   end 
   proc {Send P M}
      Ms Mr in 
      {Exchange P.PortTag Ms Mr}
      Ms = M|!!Mr
   end 
in Port = port(new:NewPort
               is:IsPort
               send:Send)
end

Figure 9.4: Implementation of Ports by Cells and Chunks



1. This design of a FIFO queue server was proposed by Denys Duchier

Seif Haridi and Nils Franzén
Version 1.4.0 (20080702)