| << Prev | - Up - | Next >> |
In the following we present some simple examples that demonstrate a large part of the P2PS functionality. In order to use them, we import the P2PS library 'x-ozlib://cetic_ucl/p2ps/P2PS.ozf' in our applications.
This is an example of a P2P network of three peers (or nodes). It is composed of three files 'node1.oz', 'node2.oz', and 'node3.oz', representing node1, node2 and node3. Peers node1 and node2 connect to the network through node1 and respectively node2. Then, node3 sends a multicast message to node1 and node2, and a one-to-one message to the responsible of key 42. This example runs directly in the OPI (Oz Programming Interface).
The first node of a P2P network is always "special". Actually, it represents a network by itself. We create it with the method createNet. In our case, node1 decides to work on port# 3001. Further, we run a loop to wait and show the messages sent to this node.
%% node1 %%
declare
[P2PS] = {Module.link ['x-ozlib://cetic_ucl/p2ps/P2PS.ozf']}
MS
% Create first node (with id 1) in a P2PS network.
% Set the port# in the access point config to 3001.
OP2PS = {New P2PS.p2pServices
createNet(nodeConfig: nodeConfig(nodeId:1)
apConfig:apConfig(pn:3001)
msgStrm:MS)}
% Display each message received on the message stream.
thread
for M in MS do
{Show M}
end
end
We create node2 with id 16. It joins the network via node1 specifying the Ip address and port number of node1. Note the conversion address to access point. We assume the Ip and the port are known. One could have received the access point token of node1 from another node or from node1 itself. Otherwise, a cute way of obtaining am access point in a LAN is to use the Discovery module of Mozart. Note the use of method joinNet. Further, we run a loop to wait and show the messages sent to this node.
%% node 2 %%
declare
[P2PS] = {Module.link ['x-ozlib://cetic_ucl/p2ps/P2PS.ozf']}
MS
% Build the access point token of a node (node1) in the P2PS network.
RAP = {P2PS.address2ap "127.0.0.1" 3001}
% Create a node with id 16 and join the network, using the token RAP.
OP2PS = {New P2PS.p2pServices
joinNet(remoteAP: RAP
nodeConfig: nodeConfig(nodeId:16)
apConfig:apConfig(pn:3002))}
% Get the message stream and display each received messages.
thread
for M in {OP2PS getMsgStrm($)} do
{Show M}
end
end
Finally, we create node3 without specifying its nodeId; the node will be provided with a random id. This node chooses to join the system via node2, specifying its address and port number. Note that it could have chosen to join via any other node within the network, e.g., node1. Further on, it sends a one-to-one message to the responsible of key 42, and a multicast message to node1 and node2. The following is the code implementing node3.
declare /* node3 */
[P2PS] = {Module.link ['x-ozlib://cetic_ucl/p2ps/P2PS.ozf']}
% Build the access point token of a node (here, node2) in the P2PS network.
RAP = {P2PS.address2ap "127.0.0.1" 3002}
% Create a node and join the network, using the token RAP.
OP2PS = {New P2PS.p2pServices joinNet(remoteAP: RAP)}
% Send a message to the responsible of key 42.
{OP2PS send(dst:42 msg:sasa toResp:true)}
% Send a multicast message to nodes with id 1 and 16.
{OP2PS multicast(dst:[1 16] msg:hello)}
The previous example shows how to build up a P2P system and how the P2PS library can be used for message exchange between peers. This example can be enriched with dictionary operations such as putVal a value to a given key, and getVal a value corresponding to a given key (note that getVal corresponds to the lookup primitive). The key can be any integer ranging from 0 to maxNetSize-1, i.e., the size of the virtual search space. The value can be any Oz value.
To form the distributed dictionary, we first add a local dictionary to each peer in the network. We also have a dictionary to be used to achieve synchronization with the getVal primitive.
LclDict = {Dictionary.new}
TempDict = {Dictionary.new}
The following two procedures implement the insertion and respectively the retrieval of a value to/from a distributed dictionary. The remove operation can be added similarly.
proc {PutVal +Key +Val}
{OP2PS send(dst:Key msg:put(Key Val) toResp:true)}
end
proc {GetVal +Key ?Val}
{OP2PS send(dst:Key msg:get(Key) toResp:true)}
{Dictionary.put TempDict Key Val}
{Wait Val}
end Inserting value Val at key Key into the distributed dictionary translates to storing Val at the node responsible of Key. Thus, the procedure PutVal simply sends the message put to the node responsible of Key. The put message contains the key and the value fields. Procedure GetVal sends the message get to the node responsible of Key. Val is temporary stored in TempDict and bound when we get the answer to the message get from the node responsible of Key.
Now, in order to be able to interpret these messages, we have to change a bit the loop that processes the incoming messages as follows.
% get the message stream and process each received message
thread
for M in {OP2PS getMsgStrm($)} do
case M of rcvMsg(msg:Payload src:Src dst:_) then
% store the (key value) pair to the local dictionary
case Payload of put(Key Val) then
{Dictionary.put LclDict Key Val}
% return the value corresponding to Key
[] get(Key) then Val in
{Dictionary.get LclDict Key Val}
{OP2PS send(dst:Src msg:ret(Key Val) toResp:true)}
% receive the value at key Key
[] ret(Key Val) then
{Dictionary.get TempDict Key Val}
{Dictionary.remove TempDict Key}
else skip end
else skip end
end
end If we receive message put(Key Val), the value Val is stored at key Key in the local dictionary. If the message is get(Key), the value corresponding to Key is returned with the message ret(Key Val). If the message is ret(Key Val), bind Val to the variable stored at key Key in TempDict.
Note that we assumed, for simplicity, that the network does not change, i.e., there is no node joining or leaving. A more complete example would include some redundancy and use the successors of a node as a backup for when the node would fail.
The code in 'clbpeer.oz' is a stand alone application implementing a peer node using the P2PS library. Alone, each node peer can be seen as a one-node P2P network. When another node connects to it we say we have a two nodes P2P network, and so forth. In this example, each peer node periodically announces (broadcast) its presence in the network. Other peers eventually notice this and build their own membership list (a list of known nodes). At each peer, the membership list is represented as a soft state (i.e., a state that has to be kept alive). The membership list is eventually updated each time a peer joins or leaves the network.
functor
import
Application
System
P2PS at 'x-ozlib://cetic_ucl/p2ps/P2PS.ozf'
define
RefreshTime = 5*1000 % the time before a node has to anounce its presence
LifeTime = 5*60*1000 % node default life time (5*60 sec)
Args % record with the command arguments
OP2PS % an instance of class P2PS.p2pServices
MmbDict % a memership dictionary of node ids
LockDict % lock for MmbDict
% Build a node with node id NId on local port PN .
proc {MakeNode PN}
try
OP2PS={New P2PS.p2pServices createNet(apConfig:apConfig(pn:PN))}
{System.showInfo 'local node created on port '#
{OP2PS getAPConfig($)}.pn}
catch p2ps(couldNotConfig pn:_) then
{System.showInfo '\n~~~ exception: problem with the port:'#PN#'\n'}
{Application.exit 0}
end
thread {SendAlive} end
end
% Join the system via the node with IP and remote port# PN.
% LPN is the desired local port#
proc {JoinSys IP PN LPN}
try
Tn = {P2PS.address2ap IP PN}
in
OP2PS={New P2PS.p2pServices
joinNet(remoteAP:Tn apConfig: apConfig(pn:LPN))}
{System.showInfo 'local node created on port '#
{OP2PS getAPConfig($)}.pn}
{System.showInfo 'my node Id is: '#{OP2PS getNodeConfig($)}.nodeId}
{OP2PS broadcast(msg:newnode)}
thread {SendAlive} end
catch _ then
{System.showInfo '\n~~~ error: there was an error when joining the system\n'}
{Application.exit 0}
end
end
% Broadcast amAlive msg forever.
proc {SendAlive}
{Delay RefreshTime}
try
{OP2PS broadcast(msg:alive)}
catch _ then skip end
{SendAlive}
end
% Process the input message stream.
proc {ProcMsgStrm MS}
for E in MS do
case E of rcvMsg(src:NId msg:M dst:_) then
case M of newnode then
{AddToMmbDict NId}
[] alive then
{AddToMmbDict NId}
[] leave then
{RemFromMmbDict NId unit}
else skip end
end
end
end
% Add node Id NId to MmbDict.
proc {AddToMmbDict NId}
Vrs
in
lock LockDict then
if {Dictionary.condGet MmbDict NId nil} \= nil then
Vrs = {Dictionary.get MmbDict NId}+1
{Dictionary.put MmbDict NId Vrs}
else
Vrs = 0
{Dictionary.put MmbDict NId Vrs}
{System.showInfo 'node: '#NId#' added'}
{System.show membership#{Dictionary.keys MmbDict}}
end
end
thread
% after RefreshTime plus some delay remove Nid
{Delay RefreshTime+5000}
{RemFromMmbDict NId Vrs}
end
end
% Remove node Id NId from MmbDict, if inside and the right version is used.
proc {RemFromMmbDict NId Vrs}
E
in
lock LockDict then
E = {Dictionary.condGet MmbDict NId nil}
if E == Vrs then
{Dictionary.remove MmbDict NId}
{System.showInfo 'node: '#NId#' dropped'}
{System.show membership#{Dictionary.keys MmbDict}}
elseif Vrs == unit then
{Dictionary.remove MmbDict NId}
{System.showInfo 'node: '#NId#' left'}
{System.show membership#{Dictionary.keys MmbDict}}
end
end
end
in
MmbDict = {NewDictionary}
LockDict = {NewLock}
% specify the application input arguments
Args = {Application.getCmdArgs
record(host(single type:atom default:unit) % host name
lport(single type:int default:_) % local port
rport(single type:int default:unit) % remote port
lifetime(single type:int default:LifeTime))} % node's life time
if Args.host \= unit then
if Args.rport \= unit then
{JoinSys Args.host Args.rport Args.lport}
thread {ProcMsgStrm {OP2PS getMsgStrm($)}} end
else
{System.showInfo '\n~~~ error: provide the remote port: e.g., --rport 3002\n'}
{Application.exit 0}
end
else
{MakeNode Args.lport}
thread {ProcMsgStrm {OP2PS getMsgStrm($)}} end
end
thread
% after a live time leave the sys and close the application
{Delay Args.lifetime*1000}
try
{OP2PS broadcast(msg:leave)}
catch _ then skip end
{Delay 1000} % wait some time until msg leave is sent
{OP2PS leaveNet}
{Application.exit 0}
end
end
--lportThe local port on which to open the access point at this node. This param is optional.
--hostThe host of the node through which the join will be done. If not provided, the node will not do join to any network, i.e., it is the first node in the network.
--rportThe port number of the node through which the join will be done. If --host is provided then --rport must be provided.
--lifetimeThe life time of the application (sec). Default is 5 minutes. This param is optional.
The first created node should not to do any join. It is the node who creates the P2P network. The port# of the access point is displayed. So just type:
$ ./clbpeer
The forthcoming nodes must do join in order to be part of a network, thus they must provide the --host and --rport params. So you can type something like :
$ ./clbpeer --host=localhost --rport=34200
| << Prev | - Up - | Next >> |