<< Prev | - Up - | Next >> |
Oz provides a set of stateful data types. These include ports, objects, arrays, and dictionaries (hash tables). These data types are abstract in the sense that they are characterized only by the set of operations performed on the members of the type. Their implementation is always hidden, and in fact different implementations exist but their corresponding behavior remains the same. For example, objects are implemented in a totally different way depending on the optimization level of the compiler. Each member is always unique by conceptually tagging it with an Oz-name upon creation. A member is created by an explicit creation operation. A type test operation always exists. In addition, a member ceases to exist when it is no longer accessible.
Port is such an abstract data-type. A Port P
is an
asynchronous communication channel that can be shared among several
senders. A port has a stream associated with it. The operation:
{Port.new S ?P}
creates a port P
and initially connects it to the
variable S
taking the role of a stream. The operation:
{Port.send P M}
will append the message M
to the end of the stream
associated with P
. The port keeps track of the end of the
stream as its next insertion point. The operation
{IsPort P ?B}
checks whether P
is
a port. In order to protect the stream S
from being bound
by mistake S
is actually a future. The following program
shows a simple example using ports:
declare S P
P = {Port.new S}
{Browse S}
{Port.send P 1}
{Port.send P 2}
If you enter the above statements incrementally you will observe that
S
gets incrementally more defined.
S
1|
1|2|_
Ports are more expressive abstractions than pure stream communication, which was discussed in Section 8.2, since they can be shared among multiple threads, and can be embedded in other data structures. Ports are the main message passing mechanism between threads in Oz.
Ports are used as a communication entry point to servers. The program
shown in
Figure 9.1
defines a thread that acts as FIFO queue server. It has two ports, one
for inserting items to the queue using put
, and the other
for fetch items out of the queue using get
. The use of
single-assignment (logic) variables makes the server insensitive to
the relative arrival order of get
and
put
requests. get
requests can arrive even
when the queue is empty. A server is created by
{NewQueueServer ?Q}
. This procedure returns back a
record Q
with features put
and
get
each holding a unary procedure. A client thread
having access to Q
can request services by invoking these
procedure. Notice how results are returned back through logic
variables. A client requesting an Item in the queue will call
{Q.get I}
. The server
will eventually answer back by binding I
to an item.
declare
fun {NewQueueServer}
Given GivePort={Port.new Given}
Taken TakePort={Port.new Taken}
in
Given = Taken
queue(put:proc {$ X} {Port.send GivePort X} end
get:proc {$ X} {Port.send TakePort X} end)
end
Figure 9.1: Concurrent Queue server, first attempt
Try the following sequence of statements. The program will not work. So, what is the problem?
declare
thread Q = {NewQueueServer} end
{Q.put 1}
{Browse {Q.get $}}
{Browse {Q.get $}}
{Browse {Q.get $}}
{Q.put 2}
{Q.put 3}
The problem is that Given = Taken
is trying to
impose equality between two futures. Remember that
Given
and Taken
are futures that can only be
read and cannot be bound. So the thread corresponding to the queue
server will suspend in the statement
Given = Taken
. This problem is remedied by
running this statement in its own thread as shown in
Figure 9.2
1.
The program works as follows.
{Q.put I0} {Q.put I1} ... {Q.put In}
will incrementally add the elements
I0 I1 ... In
to the stream Given
, resulting in
I0|I1|...|In|<Future1>
.
{Q.get X0} {Q.put X1} ... {Q.put Xn}
will add the elements
X0 X1 ... Xn
to the stream Taken
resulting in
X0|X1|...|Xn|<Future2>
. The equality constraint Given = Taken
will
bind Xi's
to
Ii's
.
declare
fun {NewQueueServer}
Given GivePort={Port.new Given}
Taken TakePort={Port.new Taken}
in
thread Given=Taken end
queue(put:proc {$ X} {Port.send GivePort X} end
get:proc {$ X} {Port.send TakePort X} end)
end
Figure 9.2: Concurrent Queue server
Warning: The code above is correct but, due to a limitation in the current Oz implementation, leaks memory. As a workaround, one can use the code below as a drop-in replacement.
declare
fun {NewQueueServer}
Given GivePort={Port.new Given}
Taken TakePort={Port.new Taken}
in
thread
for X in Given Y in Taken do
X=Y
end
end
queue(put:proc {$ X} {Port.send GivePort X} end
get:proc {$ X} {Port.send TakePort X} end)
end
Ports are actually stateful data structures. A port keeps a local state internally tracking the end of its associated stream. Oz provides two primitive devices to construct abstract stateful data-types chunks and cells. All others subtypes of chunks can be defined in terms of chunks and cells.
A chunk is similar to a record except that the label of a chunk is an oz-name, and there is no arity operation available on chunks. This means one can hide certain components of a chunk if the feature of the component is an oz-name that is visible only (by lexical scoping) to user-defined operations on the chunk.
A chunk is created by the procedure
{NewChunk Record}
. This creates a chunk with the
same components as the record, but having a unique label. The
following program creates a chunk.
local X in
X={NewChunk f(c:3 a:1 b:2)}
{Browse X}
{Browse X.c}
end
This will display the following.
<Ch>(a:1 b:2 c:3)
3
Warning: As a syntactic convenience, one can equate
an expression E
at an expression position with a variable
X = E
, and use X
to refer to the
value of the expression. Using this notation the above program could
be written as
local X in
{Browse X={NewChunk f(c:3 a:1 b:2)}}
{Browse X.c}
end
In Figure 9.3, we show an example of using the information hiding ability of chunks to implement Ports.
A cell could be seen as a chunk with a mutable single component. A cell is created as follows.
{NewCell X ?C}
A cell is created with the initial content X
.
C
is bound to a cell. The
Table 9.1
shows the operations on a cell.
Operation |
Description |
---|---|
|
Creates a cell C with content X |
|
Returns the content of C in X |
|
Modifies the content of C to Y |
|
Tests if C is a cell |
|
Swaps atomically the content of C from X to Y |
Table 9.1: Cell operations
Check the following program. The last statement increments the cell by
one. If we leave out
thread ... end
the program deadlocks. Do you know why?
local I O X in
I = {NewCell a} {Browse @I}
{Assign I b} {Browse @I}
{Assign I X} {Browse @I}
X = 5*5
{Exchange I O thread O+1 end} {Browse @I}
end
Cells and higher-order iterators allow conventional assignment-based
programming in Oz. The following program accumulates in the cell
J
the value of
.
declare J in
J = {NewCell 0}
{For 1 10 1
proc {$ I}
O N in
{Exchange J O N}
N = O+I
end}
{Browse @J}
Ports described in
Section 9.1
can be implemented by chunks and cells in a secure way, i.e. as an
abstract data type that cannot be forged. The program in
Figure 9.3
shows an implementation of Ports. Initially an Oz-name is created
locally, which is accessible only by the Port operations. A port is
created as a chunk that has one component, which is a cell. The cell
is initialized to the stream associated with the port. The type test
IsPort
is done by checking the feature Port
.
Sending a message to a port results in updating the stream atomically,
and updating the cell to point to the tail of the stream.
declare Port in
local
PortTag = {NewName} %New Oz name
fun {NewPort S}
C = {NewCell S} in
{NewChunk port(PortTag:C)}
end
fun {IsPort ?P}
{Chunk.hasFeature P PortTag} %Checks a chunk feature
end
proc {Send P M}
Ms Mr in
{Exchange P.PortTag Ms Mr}
Ms = M|Mr
end
in Port = port(new:NewPort
is:IsPort
send:Send)
end
Figure 9.3: Implementation of Ports by Cells and Chunks
The implementation in Figure 9.3 does not protect the stream of the port. Protection of the stream is done using a future as follows.
declare Port in
local
PortTag = {NewName} %New Oz name
fun {NewPort FS}
S C = {NewCell S} in
FS = !!S % Create a future
{NewChunk port(PortTag:C)}
end
fun {IsPort ?P}
{Chunk.hasFeature P PortTag} %Checks a chunk feature
end
proc {Send P M}
Ms Mr in
{Exchange P.PortTag Ms Mr}
Ms = M|!!Mr
end
in Port = port(new:NewPort
is:IsPort
send:Send)
end
Figure 9.4: Implementation of Ports by Cells and Chunks
<< Prev | - Up - | Next >> |