List Info

Thread: LAM: Threads or Recv?




LAM: Threads or Recv?
user name
2006-05-25 11:55:48
Hi i have a strange problem.
My processes are like this.

Each process has a thread to recv messages(MPI non blocking recv). The thread loops infinitely.
The main thread searches for a value in its array of known value. If not found it tries to get it from other processes.It sends req for the same using ; MPI send call.(non blocking MPI send).

I am using 6 such processes.

My problem is i get proper results some time, sometime it just hangs.

Is it like if many messages are send to a process by others at a time, the receiver process cant handle? it misses something?

Is it problems with threads? I use  Posix threads.

I am attaching the program...though other functions are not that important for others.


Any idea, or suggestion regarding implementation of concurrent threads is welcome.



Thanks in advance


Regards,
Imran


#include<iostream>
#include "mpi.h"
#include<pthread.h>
#include<signal.h>
#include<unistd.h>
#define CACHESIZE 10
#define HOPCOUNT 5
#define RESULTLIMIT 30
#define MAXRESOURCES 30
#define DISCARDTAG 9999
using namespace std;
using namespace MPI;
int search(int);
void* listen(void*);
int get_best_neighbor(int);
int getindex(int);
void sig_handler(int);
void add_cache(int,int);
int check_cache(int);
int me,nbrcnt,res_dist_size,reqcnt,fwdcnt,respcnt,cachecnt;
int *nbrs,*expbase,*myint;
int cache[CACHESIZE][2];
MPI::Graphcomm gcomm;
pthread_t tid;

int main(int argc, char**argv)
{
int i,wsize,membershipkey;
int result;
const int index[6]={1,4,6,9,10,12};
const int edges[12]={1,0,2,5,1,3,2,4,5,3,1,3};
int nnodes;
reqcnt=fwdcnt=respcnt=cachecnt=0;
MPI::Init_thread(argc,argv,2);
me=MPI::COMM_WORLD.Get_rank();
wsize=MPI::COMM_WORLD.Get_size();
nnodes=6;
gcomm=MPI::COMM_WORLD.Create_graph(nnodes,index,edges,false);
nbrcnt=gcomm.Get_neighbors_count(me);
nbrs=new int[nbrcnt];
expbase=new int[nbrcnt];
gcomm.Get_neighbors(me,nbrcnt,nbrs);

for(int i=0;i<nbrcnt;i++)
{
expbase[i]=0;
}

res_dist_size=MAXRESOURCES/wsize;

myint=new int[res_dist_size];

 &nbsp;  for(int i=res_dist_size*me;i&lt;res_dist_size*(me+1);i++)
 &nbsp;  {
 &nbsp;  myint[i-(res_dist_size)*me]=i;
 &nbsp;  }
 &nbsp;  cout<&lt;endl;
 &nbsp; 

gcomm.Barrier();

if(pthread_create(&amp;tid,0,listen,(void*)&me)<;0)
 &nbsp;  {
 &nbsp;   &nbsp;  cout<&lt;"Thread cant be created"&lt;<endl;
 &nbsp;  }

{
 &nbsp;  int randnum;
 &nbsp;  switch(me)
 &nbsp;  {
 &nbsp;  case 0:
 &nbsp;  randnum=4;
 &nbsp;  break;
 &nbsp;  case 1:
 &nbsp;  randnum=10;
 &nbsp;  break;
 &nbsp;  case 2:
 &nbsp;  randnum=8;
 &nbsp;  break;
 &nbsp;  case 3:
 &nbsp;  randnum=10;
 &nbsp;  break;
 &nbsp;  case 4:
 &nbsp;  randnum=15;
 &nbsp;  break;
 &nbsp;  case 5:
 &nbsp;  randnum=8;
 &nbsp;  break;
 &nbsp;  default :
 &nbsp;  randnum=27;
 &nbsp;  }
 &nbsp;  int sendbuf[2];
 &nbsp;  reqcnt++;
 &nbsp;  cout<&lt;"P="<&lt;me<&lt;" RN="<&lt;randnum&lt;<endl;
 &nbsp;  int resrc_cnt=search(randnum);
 &nbsp;   &nbsp;  if(resrc_cnt>0)
 &nbsp;   &nbsp;  {
 &nbsp;   &nbsp;   &nbsp; 
 &nbsp;   &nbsp;   &nbsp;  respcnt++;
 &nbsp;   &nbsp;   &nbsp;  cout<&lt;"Yahoo! P="<<;me<<" A="<<;randnum<;<" Respcnt="&lt;<respcnt<<endl;
 &nbsp;   &nbsp;  } &nbsp; 
 &nbsp;   &nbsp;  else
 &nbsp;   &nbsp;  {
 &nbsp;   &nbsp;   &nbsp;  sendbuf[0]=randnum;
 &nbsp;   &nbsp;   &nbsp;  sendbuf[1]=me;
 &nbsp;   &nbsp;   &nbsp;  int bestneighbor=get_best_neighbor(me);
 &nbsp;   &nbsp;   &nbsp;  add_cache(randnum,bestneighbor);
 &nbsp;   &nbsp;   &nbsp;  MPI::Request request;
 &nbsp;   &nbsp;   &nbsp;  MPI::Status status;
 &nbsp;   &nbsp;   &nbsp;  request=gcomm.Isend(&amp;sendbuf,2,MPI::INT,bestneighbor,HOPCOUNT);
 &nbsp;   &nbsp;   &nbsp;  fwdcnt++;
 &nbsp;   &nbsp;   &nbsp;  cout<&lt;"P="<&lt;me<&lt;" NA="<&lt;randnum&lt;<" RF-->"&lt;<bestneighbor<;<endl;
 &nbsp;   &nbsp;   &nbsp;  request.Wait(status);
 &nbsp;   &nbsp;  }
}



int ret;
int *retval=&amp;ret;
pthread_join(tid,(void**)&retval);
gcomm.Free();
MPI::Finalize();


}



int search(int keyval) //returns the number of matches found
{
int resultcnt=0;
 &nbsp;  for(int k=0;k<res_dist_size;k++)
 &nbsp;  {
 &nbsp;   &nbsp;  if(keyval==myint[k])
 &nbsp;   &nbsp;  {
 &nbsp;   &nbsp;  //pos=k;
 &nbsp;   &nbsp;  resultcnt++;
 &nbsp;   &nbsp;  }
 &nbsp;  }
return resultcnt;
}


void * listen(void *t)
{
int recvbuf[2];
MPI::Status status;
MPI::Request request;
static int exp_update_cnt=0;
int recvcnt=0;
int deepcntr=1;
int itrcnt=0;
while(1)
{
itrcnt++;
request=gcomm.Irecv(&amp;recvbuf,2,MPI::INT,MPI::ANY_SOURCE,MPI::ANY_TAG);
recvcnt++;
request.Wait(status);

int source,tag;
source=status.Get_source();
tag=status.Get_tag();

cout<&lt;"P= "<<me<<"Recvd: Source= "<<source<&lt;" tag= "<<tag<<endl;

if(tag==DISCARDTAG)
{
 &nbsp;  if(nbrcnt==1)
 &nbsp;  {
 &nbsp;  //increment TTL and send to same neighbor
 &nbsp;  MPI::Request request;
 &nbsp;  MPI::Status status;
 &nbsp;  request=gcomm.Isend(&amp;recvbuf,2,MPI::INT,nbrs[0],(HOPCOUNT+deepcntr));
 &nbsp;  cout<&lt;"AP="<<me<&lt;"NA="<;<recvbuf[0]<<;"RF->"&lt;<nbrs[0]<<"TTL="<&lt;HOPCOUNT+deepcntr&lt;<endl;  
 &nbsp;  deepcntr++;
 &nbsp;  request.Wait(status);
 &nbsp;  }
 &nbsp;  else //many other neighbors are there
 &nbsp;  {
 &nbsp;  int fwdnbr=check_cache(recvbuf[0]);
 &nbsp;  int bestnextnode;
 &nbsp;   &nbsp;  if(fwdnbr&gt;=0) //entry in cache not timed out.
 &nbsp;   &nbsp;  bestnextnode=get_best_neighbor(fwdnbr);
 &nbsp;   &nbsp;  else //entry in cache is not available
 &nbsp;   &nbsp;  bestnextnode=get_best_neighbor(me);
 &nbsp;   &nbsp; 
 &nbsp;  MPI::Request request;
 &nbsp;  MPI::Status status;
 &nbsp;  request=gcomm.Isend(&amp;recvbuf,2,MPI::INT,bestnextnode,HOPCOUNT);
 &nbsp;  cout<&lt;"AP="<<me<&lt;"NA="<;<recvbuf[0]<<;"RF->"&lt;<bestnextnode<;<"TTL="<<HOPCOUNT<&lt;endl;
 &nbsp;  request.Wait(status);

 &nbsp;  }
}

else if(tag>RESULTLIMIT) //results are returned.
 &nbsp;  {
 &nbsp;  if(deepcntr>1)
 &nbsp;  deepcntr--;
 &nbsp;  cout<&lt;"Yahoo! FP="<&lt;me<<;" Node="<<source&lt;<" has "<<tag-RESULTLIMIT<<" matches"&lt;< " for "<<recvbuf[0]&lt;<" Hopcount="<<recvbuf[1]<<endl;
 &nbsp;  //check cache for details of forwarded request
 &nbsp;  int fwdnbr=check_cache(recvbuf[0]);
 &nbsp;   &nbsp;  if(fwdnbr&gt;=0)
 &nbsp;   &nbsp;  {
 &nbsp;   &nbsp;  int index=getindex(fwdnbr);
 &nbsp;   &nbsp;  respcnt++;
 &nbsp;   &nbsp;   &nbsp;  if(index&lt;0)
 &nbsp;   &nbsp;   &nbsp;  cout<&lt;"Neighbor index error!"<;<endl;
 &nbsp;   &nbsp;   &nbsp;  else
 &nbsp;   &nbsp;   &nbsp;  {
 &nbsp;   &nbsp;   &nbsp;  ++expbase[index];
 &nbsp;   &nbsp;   &nbsp;  cout<&lt;"P="<&lt;me<&lt;" expbase["&lt;<index<<"]="<<expbase[index]<<endl;
 &nbsp;   &nbsp;   &nbsp;  ++exp_update_cnt;
 &nbsp;   &nbsp;   &nbsp;  }
 &nbsp;   &nbsp;  }
 &nbsp;   &nbsp;  else cout<&lt;"P="<&lt;me<&lt;" Request timed out in cache"<<endl;
 &nbsp;  }
else if (tag>0&amp;&tag<=HOPCOUNT)
 &nbsp;  {
 &nbsp;  tag=tag-1;
 &nbsp;  int resrc_cnt;
 &nbsp;  if(recvbuf[1]==me)
 &nbsp;  resrc_cnt=0;
 &nbsp;  resrc_cnt=search(recvbuf[0]);
 &nbsp;  fwdcnt++;
 &nbsp; 
 &nbsp;   &nbsp;  if(resrc_cnt>0)
 &nbsp;   &nbsp;  {
 &nbsp;   &nbsp;  int *a=(int*)t;
 &nbsp;   &nbsp;  int sendbuf[2];
 &nbsp;   &nbsp;  sendbuf[0]=recvbuf[0];
 &nbsp;   &nbsp;  sendbuf[1]=HOPCOUNT-tag; //for hop count calculation
 &nbsp;   &nbsp;  MPI::Request request;
 &nbsp;   &nbsp;  MPI::Status status;
 &nbsp;   &nbsp;  request=gcomm.Isend(&amp;sendbuf,2,MPI::INT,recvbuf[1],RESULTLIMIT+resrc_cnt);
 &nbsp;   &nbsp;  cout<&lt;"P="<&lt;me<&lt;" Response sent for "<<recvbuf[0]&lt;<" to node "<<recvbuf[1]&lt;<endl;
 &nbsp;   &nbsp;  request.Wait(status);
 &nbsp;   &nbsp;  }
 &nbsp;   &nbsp;  else
 &nbsp;   &nbsp;  {
 &nbsp;   &nbsp; 
 &nbsp;   &nbsp;  int bestneighbor=get_best_neighbor(source);
 &nbsp;   &nbsp;   &nbsp;  //still tag value is positive then send.Else discard packet.
 &nbsp;   &nbsp;   &nbsp;  if(tag>0)
 &nbsp;   &nbsp;   &nbsp;  {
 &nbsp;   &nbsp;   &nbsp;  MPI::Request request;
 &nbsp;   &nbsp;   &nbsp;  MPI::Status status;
 &nbsp;   &nbsp;   &nbsp;  request=gcomm.Isend(&amp;recvbuf,2,MPI::INT,bestneighbor,tag);
 &nbsp;   &nbsp;   &nbsp;  cout<&lt;"RF-> P= "<<me<<"&nbsp; resrc= "<<recvbuf[0]&lt;<" origin= "<<recvbuf[1]&lt;<" Prev node= "<<source<&lt;". "<<me<<" -> "<<bestneighbor<<endl;
 &nbsp;   &nbsp;   &nbsp;  request.Wait(status);
 &nbsp;   &nbsp;   &nbsp;  }
 &nbsp;   &nbsp;   &nbsp;  else
 &nbsp;   &nbsp;   &nbsp;  //send a notice to other matchmaker saying his result is discarded
 &nbsp;   &nbsp;   &nbsp;  {
 &nbsp;   &nbsp;   &nbsp;  MPI::Request request;
 &nbsp;   &nbsp;   &nbsp;  MPI::Status status;
 &nbsp;   &nbsp;   &nbsp;  request=gcomm.Isend(&amp;recvbuf,2,MPI::INT,recvbuf[1],DISCARDTAG);
 &nbsp;   &nbsp;   &nbsp;  cout<&lt;"P="<&lt;me<&lt;"Request discarded"<<endl;//discarded request
 &nbsp;   &nbsp;   &nbsp;  request.Wait(status);
 &nbsp;   &nbsp;   &nbsp;  }
 &nbsp;   &nbsp; 
 &nbsp;   &nbsp;  }
 &nbsp;  }

else
 &nbsp;  {
 &nbsp;  MPI::Request request;
 &nbsp;   &nbsp;  MPI::Status status;
 &nbsp;   &nbsp;  request=gcomm.Isend(&amp;recvbuf,2,MPI::INT,recvbuf[1],DISCARDTAG);
 &nbsp;   &nbsp;  cout<&lt;"P="<&lt;me<&lt;" Invalid tag !! Request discarded :-("<&lt;endl;&nbsp; &nbsp;
 &nbsp;   &nbsp;  request.Wait(status);
 &nbsp; 
 &nbsp;  }
}


}


int get_best_neighbor(int source)
{
int max=0;
int index=-1;
if(nbrcnt&gt;1)
{

 &nbsp;  for(int i=0;i<nbrcnt;i++) //search who has good expereince
 &nbsp;  {

 &nbsp;   &nbsp;  if(expbase[i]>max &&(nbrs[i]!=source)) //take care that it is not forwarded back.
 &nbsp;   &nbsp;  {
 &nbsp;   &nbsp;  max=expbase[i];
 &nbsp;   &nbsp;  index=i;
 &nbsp;   &nbsp;  }

 &nbsp;  }

 &nbsp;  if(index&gt;=0) //best neighbor found
 &nbsp;  return nbrs[index];

 &nbsp;  else //cant find who is best,randomly select one
 &nbsp;  {
 &nbsp;   &nbsp;  int temp=me;
 &nbsp;   &nbsp;  int loopcntr=0;
 &nbsp;   &nbsp;  while(1)
 &nbsp;   &nbsp;  {
 &nbsp;   &nbsp;  loopcntr++;
 &nbsp;   &nbsp;  int newindex=((random()%(7+temp++))*17)%nbrcnt;
 &nbsp;   &nbsp;  if(nbrs[newindex]!=source)
 &nbsp;   &nbsp;  {
 &nbsp;   &nbsp;  return nbrs[newindex];
 &nbsp;   &nbsp;  }
 &nbsp;   &nbsp; 
 &nbsp;   &nbsp;  }

 &nbsp;  }
}

return nbrs[0];

}


int getindex(int rank)
{
 &nbsp;  for(int i=0;i<nbrcnt;i++)
 &nbsp;  {
 &nbsp;   &nbsp;  if(nbrs[i]==rank)
 &nbsp;   &nbsp;  return i;
 &nbsp;  }
 &nbsp;  cout<&lt;" P= "<<rank<<" is not a neighbor of P= "<<me<<endl;
 &nbsp;  return -1;
}


void sig_handler(int signal)
{
cout<&lt;"Received signal "<<signal<&lt;endl;
cout<&lt;"Performing cleanup..."<<endl;
pthread_cancel(tid);
MPI::Finalize();
//system("lamclean");
}

void add_cache(int num,int node)
{

if(cachecnt>=CACHESIZE)
cachecnt=0;
cache[cachecnt][0]=num;
cache[cachecnt][1]=node;
cachecnt++;

}

int check_cache(int num)
{

for(int i=0;i<CACHESIZE;i++)
{
if(cache[i][0]==num)
return (cache[i][1]);
}
return -1;
}

__________________________________________________
Do You Yahoo!?
Tired of spam? Yahoo! Mail has the best spam protection around
http://mail.yahoo.com

[1]

about | contact  Other archives ( Real Estate discussion Medical topics )