4 Tutorial Examples

In the following we present some simple examples that demonstrate a large part of the P2PS functionality. In order to use them, we import the P2PS library 'x-ozlib://cetic_ucl/p2ps/P2PS.ozf' in our applications.

4.1 Three peer network

This is an example of a P2P network of three peers (or nodes). It is composed of three files 'node1.oz', 'node2.oz', and 'node3.oz', representing node1, node2 and node3. Peers node1 and node2 connect to the network through node1 and respectively node2. Then, node3 sends a multicast message to node1 and node2, and a one-to-one message to the responsible of key 42. This example runs directly in the OPI (Oz Programming Interface).

The first node of a P2P network is always "special". Actually, it represents a network by itself. We create it with the method createNet. In our case, node1 decides to work on port# 3001. Further, we run a loop to wait and show the messages sent to this node.

Source File

%% node1 %%
declare 
[P2PS] = {Module.link ['x-ozlib://cetic_ucl/p2ps/P2PS.ozf']}
MS
 
% Create first node (with id 1) in a P2PS network.
% Set the port# in the access point config to 3001.
OP2PS = {New P2PS.p2pServices
         createNet(nodeConfig: nodeConfig(nodeId:1)
                   apConfig:apConfig(pn:3001)
                   msgStrm:MS)}
 
% Display each message received on the message stream.
thread 
   for M in MS do 
      {Show M}
   end 
end 

We create node2 with id 16. It joins the network via node1 specifying the Ip address and port number of node1. Note the conversion address to access point. We assume the Ip and the port are known. One could have received the access point token of node1 from another node or from node1 itself. Otherwise, a cute way of obtaining am access point in a LAN is to use the Discovery module of Mozart. Note the use of method joinNet. Further, we run a loop to wait and show the messages sent to this node.

Source File

%% node 2 %%
declare 
[P2PS] = {Module.link ['x-ozlib://cetic_ucl/p2ps/P2PS.ozf']}
MS  
 
% Build the access point token of a node (node1) in the P2PS network.
RAP = {P2PS.address2ap "127.0.0.1" 3001}
 
% Create a node with id 16 and join the network, using the token RAP.
OP2PS = {New P2PS.p2pServices
         joinNet(remoteAP:   RAP
                 nodeConfig: nodeConfig(nodeId:16)
                 apConfig:apConfig(pn:3002))}
 
% Get the message stream and display each received messages.
thread 
   for M in {OP2PS getMsgStrm($)} do 
      {Show M}
   end 
end 

Finally, we create node3 without specifying its nodeId; the node will be provided with a random id. This node chooses to join the system via node2, specifying its address and port number. Note that it could have chosen to join via any other node within the network, e.g., node1. Further on, it sends a one-to-one message to the responsible of key 42, and a multicast message to node1 and node2. The following is the code implementing node3.

Source File

declare /* node3 */ 
[P2PS] = {Module.link ['x-ozlib://cetic_ucl/p2ps/P2PS.ozf']}
 
% Build the access point token of a node (here, node2) in the P2PS network.
RAP = {P2PS.address2ap "127.0.0.1" 3002}
 
% Create a node and join the network, using the token RAP.
OP2PS = {New P2PS.p2pServices joinNet(remoteAP: RAP)}
 
% Send a message to the responsible of key 42.
{OP2PS send(dst:42 msg:sasa toResp:true)}
          
% Send a multicast message to nodes with id 1 and 16.
{OP2PS multicast(dst:[1 16] msg:hello)}

4.2 Adding dictionary primitives

The previous example shows how to build up a P2P system and how the P2PS library can be used for message exchange between peers. This example can be enriched with dictionary operations such as putVal a value to a given key, and getVal a value corresponding to a given key (note that getVal corresponds to the lookup primitive). The key can be any integer ranging from 0 to maxNetSize-1, i.e., the size of the virtual search space. The value can be any Oz value.

To form the distributed dictionary, we first add a local dictionary to each peer in the network. We also have a dictionary to be used to achieve synchronization with the getVal primitive.

LclDict  = {Dictionary.new}
TempDict = {Dictionary.new}

The following two procedures implement the insertion and respectively the retrieval of a value to/from a distributed dictionary. The remove operation can be added similarly.

proc {PutVal +Key +Val}
   {OP2PS send(dst:Key msg:put(Key Val) toResp:true)}
end 
 
proc {GetVal +Key ?Val}
   {OP2PS send(dst:Key msg:get(Key) toResp:true)}
   {Dictionary.put TempDict Key Val}
   {Wait Val}
end

Inserting value Val at key Key into the distributed dictionary translates to storing Val at the node responsible of Key. Thus, the procedure PutVal simply sends the message put to the node responsible of Key. The put message contains the key and the value fields. Procedure GetVal sends the message get to the node responsible of Key. Val is temporary stored in TempDict and bound when we get the answer to the message get from the node responsible of Key.

Now, in order to be able to interpret these messages, we have to change a bit the loop that processes the incoming messages as follows.

% get the message stream and process each received message
thread 
   for M in {OP2PS getMsgStrm($)} do 
      case M of rcvMsg(msg:Payload src:Src dst:_) then 
         % store the (key value) pair to the local dictionary
         case Payload of put(Key Val) then 
            {Dictionary.put LclDict Key Val}    
         % return the value corresponding to Key
         [] get(Key) then Val in 
            {Dictionary.get LclDict Key Val}
            {OP2PS send(dst:Src msg:ret(Key Val) toResp:true)}
         % receive the value at key Key
         [] ret(Key Val) then 
            {Dictionary.get TempDict Key Val}
            {Dictionary.remove TempDict Key}  
         else skip end 
      else skip end 
   end 
end

If we receive message put(Key Val), the value Val is stored at key Key in the local dictionary. If the message is get(Key), the value corresponding to Key is returned with the message ret(Key Val). If the message is ret(Key Val), bind Val to the variable stored at key Key in TempDict.

Note that we assumed, for simplicity, that the network does not change, i.e., there is no node joining or leaving. A more complete example would include some redundancy and use the successors of a node as a backup for when the node would fail.

4.3 Collaborative peers

The code in 'clbpeer.oz' is a stand alone application implementing a peer node using the P2PS library. Alone, each node peer can be seen as a one-node P2P network. When another node connects to it we say we have a two nodes P2P network, and so forth. In this example, each peer node periodically announces (broadcast) its presence in the network. Other peers eventually notice this and build their own membership list (a list of known nodes). At each peer, the membership list is represented as a soft state (i.e., a state that has to be kept alive). The membership list is eventually updated each time a peer joins or leaves the network.

Source File

functor 
import 
   Application
   System
 
   P2PS at 'x-ozlib://cetic_ucl/p2ps/P2PS.ozf' 
define 
   RefreshTime  = 5*1000 % the time before a node has to anounce its presence
   LifeTime     = 5*60*1000 % node default life time (5*60 sec)
    
   Args     % record with the command arguments
   OP2PS    % an instance of class P2PS.p2pServices
   MmbDict  % a memership dictionary of node ids
   LockDict % lock for MmbDict
    
   % Build a node with node id NId on local port PN .
   proc {MakeNode PN}
      try  
         OP2PS={New P2PS.p2pServices createNet(apConfig:apConfig(pn:PN))}
         {System.showInfo 'local node created on port '# 
          {OP2PS getAPConfig($)}.pn}
      catch p2ps(couldNotConfig pn:_) then 
         {System.showInfo '\n~~~ exception: problem with the port:'#PN#'\n'}
         {Application.exit 0}
      end 
 
      thread {SendAlive} end 
   end 
 
   % Join the system via the node with IP and remote port# PN.
   % LPN is the desired local port#
   proc {JoinSys IP PN LPN}
      try 
          Tn = {P2PS.address2ap IP PN}
      in 
         OP2PS={New P2PS.p2pServices
                joinNet(remoteAP:Tn apConfig: apConfig(pn:LPN))}
 
         {System.showInfo 'local node created on port '# 
          {OP2PS getAPConfig($)}.pn}
         {System.showInfo 'my node Id is: '#{OP2PS getNodeConfig($)}.nodeId}
          
         {OP2PS broadcast(msg:newnode)}
         thread {SendAlive} end 
      catch _ then 
         {System.showInfo '\n~~~ error: there was an error when joining the system\n'}
         {Application.exit 0}
      end 
   end 
 
   % Broadcast amAlive msg forever.
   proc {SendAlive}
      {Delay RefreshTime}
       
      try 
         {OP2PS broadcast(msg:alive)}
      catch _ then skip end 
       
      {SendAlive}
   end 
 
   % Process the input message stream.
   proc {ProcMsgStrm MS}
      for E in MS do 
         case E of rcvMsg(src:NId msg:M dst:_) then 
            case M of newnode then 
               {AddToMmbDict NId}
            [] alive then 
               {AddToMmbDict NId}   
            [] leave then 
               {RemFromMmbDict NId unit}    
            else skip end 
         end 
      end 
   end 
 
   % Add node Id NId to MmbDict.
   proc {AddToMmbDict NId}
      Vrs
   in 
      lock LockDict then 
         if {Dictionary.condGet MmbDict NId nil} \= nil then 
            Vrs = {Dictionary.get MmbDict NId}+1
            {Dictionary.put MmbDict NId Vrs}
         else 
            Vrs = 0
            {Dictionary.put MmbDict NId Vrs}
            {System.showInfo 'node: '#NId#' added'}
            {System.show membership#{Dictionary.keys MmbDict}}
         end 
      end 
          
      thread 
         % after RefreshTime plus some delay remove Nid
         {Delay RefreshTime+5000}  
         {RemFromMmbDict NId Vrs}
      end 
   end 
 
   % Remove node Id NId from MmbDict, if inside and the right version is used.
   proc {RemFromMmbDict NId Vrs}
      E
   in 
      lock LockDict then 
         E = {Dictionary.condGet MmbDict NId nil}
          
         if E == Vrs then 
            {Dictionary.remove MmbDict NId}
            {System.showInfo 'node: '#NId#' dropped'}
            {System.show membership#{Dictionary.keys MmbDict}}
         elseif Vrs == unit then 
            {Dictionary.remove MmbDict NId}
            {System.showInfo 'node: '#NId#' left'}
            {System.show membership#{Dictionary.keys MmbDict}}
         end 
      end 
   end 
 
in 
   MmbDict = {NewDictionary}
   LockDict = {NewLock}
    
   % specify the application input arguments
   Args = {Application.getCmdArgs
           record(host(single type:atom default:unit)          % host name
                  lport(single type:int default:_)             % local port
                  rport(single type:int default:unit)          % remote port
                  lifetime(single type:int default:LifeTime))} % node's life time
 
   if Args.host \= unit then 
      if Args.rport \= unit then 
         {JoinSys Args.host Args.rport Args.lport}
         thread {ProcMsgStrm {OP2PS getMsgStrm($)}} end 
      else 
         {System.showInfo '\n~~~ error: provide the remote port: e.g., --rport 3002\n'}
         {Application.exit 0}
      end 
   else 
      {MakeNode Args.lport}
      thread {ProcMsgStrm {OP2PS getMsgStrm($)}} end 
   end 
 
   thread 
      % after a live time leave the sys and close the application
      {Delay Args.lifetime*1000}  
 
      try 
         {OP2PS broadcast(msg:leave)}
      catch _ then skip end 
 
      {Delay 1000}  % wait some time until msg leave is sent
      {OP2PS leaveNet}
      {Application.exit 0}       
   end 
end 

4.3.1 Application parameters

--lport

The local port on which to open the access point at this node. This param is optional.

--host

The host of the node through which the join will be done. If not provided, the node will not do join to any network, i.e., it is the first node in the network.

--rport

The port number of the node through which the join will be done. If --host is provided then --rport must be provided.

--lifetime

The life time of the application (sec). Default is 5 minutes. This param is optional.

4.3.2 Running the application

The first created node should not to do any join. It is the node who creates the P2P network. The port# of the access point is displayed. So just type:

$ ./clbpeer

The forthcoming nodes must do join in order to be part of a network, thus they must provide the --host and --rport params. So you can type something like :

$ ./clbpeer --host=localhost --rport=34200


Bruno Carton and Valentin Mesaros