/* ----------------------------------------------------------------
 *   FILE
 *	hashjoin.c
 *	
 *   DESCRIPTION
 *	Routines to handle hash join nodes
 *
 *   INTERFACE ROUTINES
 *     	ExecHashJoin
 *     	ExecInitHashJoin
 *     	ExecEndHashJoin
 *
 *   IDENTIFICATION
 *	$Header: /data/01/postgres/src/backend/executor/RCS/n_hashjoin.c,v 1.13 1992/08/04 17:37:50 mer Exp $
 * ----------------------------------------------------------------
 */

#include <sys/file.h>
#include "storage/bufmgr.h"	/* for BLCKSZ */
#include "tcop/slaves.h"
#include "executor/executor.h"
#include "planner/clauses.h"

 RcsId("$Header: /data/01/postgres/src/backend/executor/RCS/n_hashjoin.c,v 1.13 1992/08/04 17:37:50 mer Exp $");

/* ----------------------------------------------------------------
 *   	ExecHashJoin
 *
 *	This function implements the Hybrid Hashjoin algorithm.
 *	recursive partitioning remains to be added.
 *	Note: the relation we build hash table on is the inner
 *	      the other one is outer.
 * ----------------------------------------------------------------
 */

/**** xxref:
 *           ExecProcNode
 ****/
TupleTableSlot				/* return: a tuple or LispNil */
ExecHashJoin(node)
    HashJoin node;			/* the hash join node */
{
    HashJoinState	hjstate;
    HashState		hashstate;
    EState		estate;
    Plan 	  	outerNode;
    Plan		hashNode;
    List		hjclauses;
    List		clause;
    List		qual;
    ScanDirection 	dir;
    TupleTableSlot	inntuple;
    Var			outerVar;
    ExprContext		econtext;

    HashJoinTable	hashtable;
    int			bucketno;
    HashBucket		bucket;
    HeapTuple		curtuple;

    bool		qualResult;

    TupleDescriptor	tupType;
    Pointer		tupValue;
    List		targetList;
    int			len;
    
    TupleTableSlot	outerTupleSlot;
    TupleTableSlot	innerTupleSlot;
    int			nbatch;
    int			curbatch;
    File		*outerbatches;
    RelativeAddr	*outerbatchNames;
    File		*innerbatches;
    RelativeAddr	*innerbatchNames;
    RelativeAddr	*outerbatchPos;
    Var			innerhashkey;
    int			batch;
    int			batchno;
    char		*buffer;
    int			i;
    char		*tempname;
    bool		hashPhaseDone;
    char		*pos;
#ifdef sequent
    slock_t		*batchLock;
#endif

    /* ----------------
     *	get information from HashJoin node
     * ----------------
     */
    hjstate =   	get_hashjoinstate(node);
    hjclauses = 	get_hashclauses(node);
    clause =		CAR(hjclauses);
    estate = 		(EState)get_state((Plan)node);
    qual = 		get_qpqual((Plan) node);
    hashNode = 		get_innerPlan((Plan) node);
    outerNode = 	get_outerPlan((Plan) node);
    hashPhaseDone = 	get_hashdone(node);

    dir =   	  	get_es_direction(estate);

    /* -----------------
     * get information from HashJoin state
     * -----------------
     */
    hashtable = 	get_hj_HashTable(hjstate);
    bucket = 		get_hj_CurBucket(hjstate);
    curtuple =		get_hj_CurTuple(hjstate);
    
    /* --------------------
     * initialize expression context
     * --------------------
     */
    econtext = 		get_cs_ExprContext((CommonState) hjstate);

    if (get_cs_TupFromTlist((CommonState)hjstate)) {
	TupleTableSlot  result;
	bool		isDone;

	result = ExecProject(get_cs_ProjInfo((CommonState)hjstate), &isDone);
	if (!isDone)
	    return result;
    }
    /* ----------------
     *	if this is the first call, build the hash table for inner relation
     * ----------------
     */
    if (!hashPhaseDone) {  /* if the hash phase not completed */
	hashtable = get_hashjointable(node);
        if (hashtable == NULL) { /* if the hash table has not been created */
	    /* ----------------
	     * create the hash table
	     * ----------------
	     */
	    hashtable = ExecHashTableCreate(hashNode);
	    set_hj_HashTable(hjstate, hashtable);
	    innerhashkey = get_hashkey((Hash) hashNode);
	    set_hj_InnerHashKey(hjstate, innerhashkey);

	    /* ----------------
	     * execute the Hash node, to build the hash table 
	     * ----------------
	     */
	    set_hashtable((Hash) hashNode, hashtable);
	    innerTupleSlot = ExecProcNode(hashNode);
	}
	bucket = NULL;
	curtuple = NULL;
	curbatch = 0;
	set_hashdone(node, true);
    }
    else if (hashtable == NULL && !IsMaster && ParallelExecutorEnabled()) {
	IpcMemoryId  shmid;
	IpcMemoryKey hashjointablekey;
	int          hashjointablesize;

	hashjointablekey = get_hashjointablekey(node);
	hashjointablesize = get_hashjointablesize(node);
	/* ----------------
	 *      in Sequent version, shared memory is implemented by
	 *  memory mapped files, it takes one file descriptor.
	 *  we may have to free one for this.
	 * ----------------
	 */
	closeOneVfd();
	shmid = IpcMemoryCreateWithoutOnExit(hashjointablekey,
					    hashjointablesize,
					    HASH_PERMISSION);
	hashtable = (HashJoinTable) IpcMemoryAttach(shmid);
	set_hashjointable(node, hashtable);
	set_hj_HashTable(hjstate, hashtable);
	set_hj_HashTableShmId(hjstate, shmid);
    }
    nbatch = hashtable->nbatch;
    outerbatches = get_hj_OuterBatches(hjstate);
    if (nbatch > 0 && outerbatches == NULL) {  /* if needs hash partition */
	/* -----------------
	 *  allocate space for file descriptors of outer batch files
	 *  then open the batch files in the current process
	 * -----------------
	 */
	innerhashkey = get_hashkey((Hash) hashNode);
	set_hj_InnerHashKey(hjstate, innerhashkey);
        outerbatchNames = (RelativeAddr*)
	    ABSADDR(hashtable->outerbatchNames);
	outerbatches = (File*)
	    palloc(nbatch * sizeof(File));
	for (i=0; i<nbatch; i++) {
	    outerbatches[i] = FileNameOpenFile(
				  ABSADDR(outerbatchNames[i]), 
				  O_CREAT | O_RDWR, 0600);
	}
	set_hj_OuterBatches(hjstate, outerbatches);
	if (ParallelExecutorEnabled()) {
            innerbatchNames = (RelativeAddr*)
		               ABSADDR(hashtable->innerbatchNames);
	    innerbatches = (File*)palloc(nbatch * sizeof(File));
	    for (i=0; i<nbatch; i++) {
		innerbatches[i] = FileNameOpenFile(
				      ABSADDR(innerbatchNames[i]),
				      O_CREAT | O_RDWR, 0600);
	    }
	    set_hj_InnerBatches(hjstate, innerbatches);
	}
	else {
	    /* ------------------
	     *  get the inner batch file descriptors from the
	     *  hash node
	     * ------------------
	     */
	    set_hj_InnerBatches(hjstate, 
				get_hashBatches(get_hashstate((Hash)
							      hashNode)));
	}
    }
    outerbatchPos = (RelativeAddr*)ABSADDR(hashtable->outerbatchPos);
    curbatch = hashtable->curbatch;
#ifdef sequent
    batchLock = (slock_t*)ABSADDR(hashtable->batchLock);
#endif
    outerbatchNames = (RelativeAddr*)ABSADDR(hashtable->outerbatchNames);
	
    /* ----------------
     *	Now get an outer tuple and probe into the hash table for matches
     * ----------------
     */
    outerTupleSlot = 	get_cs_OuterTupleSlot((CommonState) hjstate);
    outerVar =   	get_leftop(clause);
    
    bucketno = -1;  /* if bucketno remains -1, means use old outer tuple */
    if (TupIsNull((Pointer) outerTupleSlot)) {
	/*
	 * if the current outer tuple is nil, get a new one
	 */
	outerTupleSlot = (TupleTableSlot)
	    ExecHashJoinOuterGetTuple(outerNode, hjstate);
	
	while (curbatch <= nbatch && TupIsNull((Pointer) outerTupleSlot)) {
	/*
	 * if the current batch runs out, switch to new batch
	 */
	    curbatch = ExecHashJoinNewBatch(hjstate);
	    if (curbatch > nbatch) {
	    /*
	     * when the last batch runs out, clean up
	     */
#ifdef sequent
	    /* ---------------
	     *  we want to make sure that only the last process does
	     *  the cleanup.
	     * ---------------
	     */
		if (ParallelExecutorEnabled()) {
		    S_LOCK(&(hashtable->tableLock));
		    if (--(hashtable->pcount) > 0) {
		       S_UNLOCK(&(hashtable->tableLock));
		       return NULL;
		      }
		    S_UNLOCK(&(hashtable->tableLock));
		  }
#endif
		if (!IsMaster && ParallelExecutorEnabled()) {
		    /* ----------------
		     *  set the shmid to the one of the current process
		     * ----------------
		     */
		    hashtable->shmid = get_hj_HashTableShmId(hjstate);
		   }
		ExecHashTableDestroy(hashtable);
		set_hj_HashTable(hjstate, NULL);
		return NULL;
	      }
	    else
	      outerTupleSlot = (TupleTableSlot)
		  ExecHashJoinOuterGetTuple(outerNode, hjstate);
	  }
	/*
	 * now we get an outer tuple, find the corresponding bucket for
	 * this tuple from the hash table
	 */
	set_ecxt_outertuple(econtext, outerTupleSlot);
	
#ifdef HJDEBUG
	printf("Probing ");
#endif
	bucketno = ExecHashGetBucket(hashtable, econtext, outerVar);
	bucket=(HashBucket)(ABSADDR(hashtable->top) 
			    + bucketno * hashtable->bucketsize);
    }
    
    for (;;) {
    /* ----------------
     *	Now we've got an outer tuple and the corresponding hash bucket,
     *  but this tuple may not belong to the current batch.
     * ----------------
     */
	if (curbatch == 0 && bucketno != -1)  /* if this is the first pass */
	   batch = ExecHashJoinGetBatch(bucketno, hashtable, nbatch);
	else
	   batch = 0;
	if (batch > 0) {
	     /*
	      * if the current outer tuple does not belong to
	      * the current batch, save to the tmp file for
	      * the corresponding batch.
	      */
	     buffer = ABSADDR(hashtable->batch) + (batch - 1) * BLCKSZ;
	     batchno = batch - 1;
#ifdef sequent
	     /* ---------------
	      *  lock the batch to write
	      * ---------------
	      */
	     if (ParallelExecutorEnabled())
		 S_LOCK(&(batchLock[batchno]));
#endif
	     pos  = ExecHashJoinSaveTuple((HeapTuple)
					  SlotContents(outerTupleSlot), 
					  buffer,
				     	  outerbatches[batchno],
					  ABSADDR(outerbatchPos[batchno]));
	     
	     outerbatchPos[batchno] = RELADDR(pos);
#ifdef sequent
	     /* ---------------
	      *  unlock the batch to write
	      * ---------------
	      */
	     if (ParallelExecutorEnabled())
		 S_UNLOCK(&(batchLock[batchno]));
#endif
	  }
	else if (bucket != NULL) {
	    do {
		/*
		 * scan the hash bucket for matches
		 */
		curtuple = ExecScanHashBucket(hjstate,
					      bucket,
					      curtuple,
					      hjclauses,
					      econtext);
		
		if (curtuple != NULL) {
		    /*
		     * we've got a match, but still need to test qpqual
		     */
                    inntuple = (TupleTableSlot)
			ExecStoreTuple((Pointer)curtuple, 
                                       (Pointer) get_hj_HashTupleSlot(hjstate),
				       InvalidBuffer,
                                       false); /* don't pfree this tuple */
		    
		    set_ecxt_innertuple(econtext, inntuple);

		    /* ----------------
		     * test to see if we pass the qualification
		     * ----------------
		     */
		    qualResult = ExecQual(qual, econtext);
		    
		    /* ----------------
		     * if we pass the qual, then save state for next call and
		     * have ExecProject form the projection, store it
		     * in the tuple table, and return the slot.
		     * ----------------
		     */
		    if (qualResult) {
			ProjectionInfo	projInfo;
			TupleTableSlot  result;
			bool            isDone;
			
			set_hj_CurBucket(hjstate, bucket);
			set_hj_CurTuple(hjstate, curtuple);
			hashtable->curbatch = curbatch;
			set_cs_OuterTupleSlot((CommonState)
					      hjstate, outerTupleSlot);
			
			projInfo = get_cs_ProjInfo((CommonState) hjstate);
			result = ExecProject(projInfo, &isDone);
			set_cs_TupFromTlist((CommonState)hjstate, !isDone);
			return result;
		    }
		}
	    }
	    while (curtuple != NULL);
	}

	/* ----------------
	 *   Now the current outer tuple has run out of matches,
	 *   so we free it and get a new outer tuple.
	 * ----------------
	 */
	outerTupleSlot = (TupleTableSlot)
	    ExecHashJoinOuterGetTuple(outerNode, hjstate);
	
	while (curbatch <= nbatch && TupIsNull((Pointer) outerTupleSlot)) {
	/*
	 * if the current batch runs out, switch to new batch
	 */
	   curbatch = ExecHashJoinNewBatch(hjstate);
	   if (curbatch > nbatch) {
	    /*
	     * when the last batch runs out, clean up
	     */
#ifdef sequent
	    /* ---------------
	     *  we want to make sure that only the last process does
	     *  the cleanup.
	     * ---------------
	     */
		if (ParallelExecutorEnabled()) {
		    S_LOCK(&(hashtable->tableLock));
		    if (--(hashtable->pcount) > 0) {
		       S_UNLOCK(&(hashtable->tableLock));
		       return NULL;
		      }
		    S_UNLOCK(&(hashtable->tableLock));
		  }
#endif
		if (!IsMaster && ParallelExecutorEnabled()) {
		    /* ----------------
		     *  set the shmid to the one of the current process
		     * ----------------
		     */
		    hashtable->shmid = get_hj_HashTableShmId(hjstate);
		   }
		ExecHashTableDestroy(hashtable);
		set_hj_HashTable(hjstate, NULL);
		return NULL;
	     }
	   else
	      outerTupleSlot = (TupleTableSlot)
		  ExecHashJoinOuterGetTuple(outerNode, hjstate);
	  }
	
	/* ----------------
	 *   Now get the corresponding hash bucket for the new
	 *   outer tuple.
	 * ----------------
	 */
	set_ecxt_outertuple(econtext, outerTupleSlot);
#ifdef HJDEBUG
	printf("Probing ");
#endif
	bucketno = ExecHashGetBucket(hashtable, econtext, outerVar);
	bucket=(HashBucket)(ABSADDR(hashtable->top) 
			    + bucketno * hashtable->bucketsize);
	curtuple = NULL;
    }
}
 
/* ----------------------------------------------------------------
 *   	ExecInitHashJoin
 *
 *	Init routine for HashJoin node.
 * ----------------------------------------------------------------
 */
/**** xxref:
 *           ExecInitNode
 ****/
List	/* return: initialization status */
ExecInitHashJoin(node, estate, parent)
    HashJoin 	node;
    EState 	estate;
    Plan	parent;
{
    HashJoinState	hjstate;
    List		targetList;
    int			len;
    TupleDescriptor	tupType;
    Pointer	        tupValue;
    ParamListInfo       paraminfo;
    ExprContext	        econtext;
    int			baseid;
    
    Plan 	  	outerNode;
    Plan		hashNode;
    
    /* ----------------
     *  assign the node's execution state
     * ----------------
     */
    set_state((Plan)node, (EStatePtr) estate);
    
    /* ----------------
     * create state structure
     * ----------------
     */
    hjstate =
	MakeHashJoinState((HashJoinTable)NULL,	/* this is a little silly */
			  (IpcMemoryId)0,
			  (HashBucket )NULL,
			  (HeapTuple )NULL,
			  (OverflowTuple )NULL,
			  (Var)LispNil,
			  (FileP)NULL,
			  (FileP)NULL,
			  (charP)NULL,
			  (int)0,
			  (Pointer)NULL,
			  (Pointer)NULL);

    set_hashjoinstate(node, hjstate);
        
    /* ----------------
     *  Miscellanious initialization
     *
     *	     +	assign node's base_id
     *       +	assign debugging hooks and
     *       +	create expression context for node
     * ----------------
     */
    ExecAssignNodeBaseInfo(estate, (BaseNode) hjstate, parent);
    ExecAssignDebugHooks((Plan) node, (BaseNode) hjstate);
    ExecAssignExprContext(estate, (CommonState) hjstate);

#define HASHJOIN_NSLOTS 2
    /* ----------------
     *	tuple table initialization
     * ----------------
     */
    ExecInitResultTupleSlot(estate, (CommonState) hjstate);    
    ExecInitOuterTupleSlot(estate,  hjstate);    
    
    /* ----------------
     * initializes child nodes
     * ----------------
     */
    outerNode = get_outerPlan((Plan) node);
    hashNode  = get_innerPlan((Plan) node);
    
    ExecInitNode(outerNode, estate, (Plan) node);
    ExecInitNode(hashNode,  estate, (Plan) node);
    
    /* ----------------
     *	now for some voodoo.  our temporary tuple slot
     *  is actually the result tuple slot of the Hash node
     *  (which is our inner plan).  we do this because Hash
     *  nodes don't return tuples via ExecProcNode() -- instead
     *  the hash join node uses ExecScanHashBucket() to get
     *  at the contents of the hash table.  -cim 6/9/91
     * ----------------
     */
    {
	HashState      hashstate  = get_hashstate((Hash) hashNode);
	TupleTableSlot slot 	  =
	    get_cs_ResultTupleSlot((CommonState)hashstate);
	set_hj_HashTupleSlot(hjstate, (Pointer) slot);
    }
    (void)ExecSetSlotDescriptor(get_hj_OuterTupleSlot(hjstate),
				ExecGetTupType(outerNode));
    ExecSetSlotExecDescriptor(get_hj_OuterTupleSlot(hjstate),
				ExecGetExecTupDesc(outerNode));
			      
    /* ----------------
     * 	initialize tuple type and projection info
     * ----------------
     */
    ExecAssignResultTypeFromTL((Plan) node, (CommonState) hjstate);
    ExecAssignProjectionInfo((Plan) node, (CommonState) hjstate);

    /* ----------------
     *	XXX comment me
     * ----------------
     */
    if (IsMaster && ParallelExecutorEnabled())
    {
        set_hj_HashTable(hjstate, get_hashjointable(node));
    }
    else
    {
        set_hj_HashTable(hjstate, NULL);
    }
    
    set_hashdone(node, false);
    
    set_hj_HashTableShmId(hjstate, 0);
    set_hj_CurBucket(hjstate, NULL);
    set_hj_CurTuple(hjstate, NULL);
    set_hj_InnerHashKey(hjstate, NULL);
    set_hj_OuterBatches(hjstate, NULL);
    set_hj_InnerBatches(hjstate, NULL);
    set_hj_OuterReadPos(hjstate, NULL);
    set_hj_OuterReadBlk(hjstate, 0);
    
    set_cs_OuterTupleSlot((CommonState) hjstate, (TupleTableSlot) NULL);
    set_cs_TupFromTlist((CommonState) hjstate, (bool) false);

    /* ----------------
     *  return true
     * ----------------
     */
    return
	LispTrue;
}
 
int
ExecCountSlotsHashJoin(node)
    Plan node;
{
    return ExecCountSlotsNode(get_outerPlan(node)) +
	   ExecCountSlotsNode(get_innerPlan(node)) +
	   HASHJOIN_NSLOTS;
}
 
/* ----------------------------------------------------------------
 *   	ExecEndHashJoin
 *
 *   	clean up routine for HashJoin node
 * ----------------------------------------------------------------
 */
 
/**** xxref:
 *           ExecEndNode
 ****/
void
ExecEndHashJoin(node)
    HashJoin node;
{
    HashJoinState   hjstate;
    
    /* ----------------
     *	get info from the HashJoin state 
     * ----------------
     */
    hjstate = get_hashjoinstate(node);

    /* ----------------
     * free hash table in case we end plan before all tuples are retrieved
     * ---------------
     */
    if (get_hj_HashTable(hjstate)) {
	ExecHashTableDestroy(get_hj_HashTable(hjstate));
	set_hj_HashTable(hjstate, NULL);
      }

    /* ----------------
     *	Free the projection info and the scan attribute info
     *
     *  Note: we don't ExecFreeResultType(hjstate) 
     *        because the rule manager depends on the tupType
     *	      returned by ExecMain().  So for now, this
     *	      is freed at end-transaction time.  -cim 6/2/91     
     * ----------------
     */    
    ExecFreeProjectionInfo((CommonState) hjstate);

    /* ----------------
     * clean up subtrees 
     * ----------------
     */
    ExecEndNode(get_outerPlan((Plan) node));
    ExecEndNode(get_innerPlan((Plan) node));

    /* ----------------
     *  clean out the tuple table
     * ----------------
     */
    ExecClearTuple((Pointer) get_cs_ResultTupleSlot((CommonState) hjstate));
    ExecClearTuple(get_hj_OuterTupleSlot(hjstate));
    ExecClearTuple(get_hj_HashTupleSlot(hjstate));

} 

/* ----------------------------------------------------------------
 *   	ExecHashJoinOuterGetTuple
 *
 *   	get the next outer tuple for hashjoin: either by
 *	executing a plan node as in the first pass, or from
 *	the tmp files for the hashjoin batches.
 * ----------------------------------------------------------------
 */
 
TupleTableSlot
ExecHashJoinOuterGetTuple(node, hjstate)
    Plan node;
    HashJoinState hjstate;
{
    TupleTableSlot	slot;
    HashJoinTable	hashtable;
    int			curbatch;
    File 		*outerbatches;
    char 		*outerreadPos;
    int 		batchno;
    char 		*outerreadBuf;
    int 		outerreadBlk;

    hashtable = get_hj_HashTable(hjstate);
    curbatch = hashtable->curbatch;

    if (curbatch == 0) {  /* if it is the first pass */
	slot = ExecProcNode(node);
	return slot;
    }
    
    /*
     * otherwise, read from the tmp files
     */
    outerbatches = get_hj_OuterBatches(hjstate);
    outerreadPos = get_hj_OuterReadPos(hjstate);
    outerreadBlk = get_hj_OuterReadBlk(hjstate);
    if (ParallelExecutorEnabled())
       outerreadBuf = ABSADDR(hashtable->readbuf) + 
		      SlaveInfoP[MyPid].groupPid * BLCKSZ; 
    else
       outerreadBuf = ABSADDR(hashtable->readbuf); 
    batchno = curbatch - 1;
    
   slot = ExecHashJoinGetSavedTuple(hjstate,
				    outerreadBuf,
				    outerbatches[batchno],
				    get_hj_OuterTupleSlot(hjstate),
				    &outerreadBlk,
				    &outerreadPos);
    
    set_hj_OuterReadPos(hjstate, outerreadPos);
    set_hj_OuterReadBlk(hjstate, outerreadBlk);
    
    return slot;
}

/* ----------------------------------------------------------------
 *   	ExecHashJoinGetSavedTuple
 *
 *   	read the next tuple from a tmp file using a certain buffer
 * ----------------------------------------------------------------
 */
 
TupleTableSlot
ExecHashJoinGetSavedTuple(hjstate, buffer, file, tupleSlot, block, position)
    HashJoinState hjstate;    
    char 	  *buffer;
    File 	  file;
    Pointer	  tupleSlot;
    int 	  *block;       /* return parameter */
    char 	  **position;  /* return parameter */
{
    char 	*bufstart;
    char 	*bufend;
    int	 	cc;
    HeapTuple 	heapTuple;
    HashJoinTable hashtable;

    hashtable = get_hj_HashTable(hjstate);
    bufend = buffer + *(long*)buffer;
    bufstart = (char*)(buffer + sizeof(long));
    if ((*position == NULL) || (*position >= bufend)) {
	if (*position == NULL)
	    if (ParallelExecutorEnabled())
		(*block) = SlaveInfoP[MyPid].groupPid;
	    else
	        (*block) = 0;
	else
	    if (ParallelExecutorEnabled())
		(*block) += hashtable->nprocess;
	    else
	        (*block)++;
	FileSeek(file, *block * BLCKSZ, L_SET);
	cc = FileRead(file, buffer, BLCKSZ);
	NDirectFileRead++;
	if (cc < 0)
	    perror("FileRead");
	if (cc == 0)  /* end of file */
	    return NULL;
	else
	    (*position) = bufstart;
    }
    heapTuple = (HeapTuple) (*position);
    (*position) = (char*)LONGALIGN(*position + heapTuple->t_len);

    return (TupleTableSlot)
	ExecStoreTuple((Pointer) heapTuple,
		       tupleSlot,
		       InvalidBuffer,
		       false);
}

/* ----------------------------------------------------------------
 *   	ExecHashJoinNewBatch
 *
 *   	switch to a new hashjoin batch
 * ----------------------------------------------------------------
 */
 
int
ExecHashJoinNewBatch(hjstate)
    HashJoinState hjstate;
{
    File 		*innerBatches;
    File 		*outerBatches;
    int 		*innerBatchSizes;
    Var 		innerhashkey;
    HashJoinTable 	hashtable;
    int 		nbatch;
    char 		*readPos;
    int 		readBlk;
    char 		*readBuf;
    TupleTableSlot 	slot;
    ExprContext 	econtext;
    int 		i;
    int 		cc;
    int			newbatch;

    hashtable = get_hj_HashTable(hjstate);
    outerBatches = get_hj_OuterBatches(hjstate);
    innerBatches = get_hj_InnerBatches(hjstate);
    nbatch = hashtable->nbatch;
    newbatch = hashtable->curbatch + 1;
#ifdef sequent
    /* -----------------
     *  We want to make sure that only the last process does
     *  the cleanup and switching to the new batch  and all
     *  the other processes have to wait until the entire batch
     *  has been finished.  This takes a counter, hashtable->pcount
     *  and a barrier, hashtable->batchBarrier to achieve
     * -----------------
     */
    if (ParallelExecutorEnabled())
        S_LOCK(&(hashtable->tableLock));
#endif
    if (ParallelExecutorEnabled() && --(hashtable->pcount) > 0) {
#ifdef sequent
	  /* ----------------
	   *  if it is not the last process, wait on the barrier
	   * ----------------
	   */
	  S_UNLOCK(&(hashtable->tableLock));
          S_WAIT_BARRIER(&(hashtable->batchBarrier));
#endif
	  if (newbatch > nbatch)
	      return newbatch;
       }
    else {
	/* ------------------
	 *  this is the last process, so it will do the cleanup and
	 *  batch-switching.
	 * ------------------
	 */
	if (newbatch == 1) {
	    /* 
	     * if it is end of the first pass, flush all the last pages for
	     * the batches.
	     */
	    outerBatches = get_hj_OuterBatches(hjstate);
	    for (i=0; i<nbatch; i++) {
		cc = FileSeek(outerBatches[i], 0L, L_XTND);
		if (cc < 0)
		    perror("FileSeek");
		cc = FileWrite(outerBatches[i],
			       ABSADDR(hashtable->batch) + i * BLCKSZ, BLCKSZ);
		NDirectFileWrite++;
		if (cc < 0)
		    perror("FileWrite");
	    }
	}
	if (newbatch > 1) {
	    /*
	     * remove the previous outer batch
	     */
	    FileUnlink(outerBatches[newbatch - 2]);
	}
	/*
	 * rebuild the hash table for the new inner batch
	 */
	innerBatchSizes = (int*)ABSADDR(hashtable->innerbatchSizes);
	/* --------------
	 *  skip over empty inner batches
	 * --------------
	 */
	while (newbatch <= nbatch && innerBatchSizes[newbatch - 1] == 0) {
	   FileUnlink(outerBatches[newbatch-1]);
	   FileUnlink(innerBatches[newbatch-1]);
	   newbatch++;
	  }
	if (newbatch > nbatch) {
	   hashtable->pcount = hashtable->nprocess;
#ifdef sequent
	   S_UNLOCK(&(hashtable->tableLock));
           S_WAIT_BARRIER(&(hashtable->batchBarrier));
#endif
	   return newbatch;
	 }
	ExecHashTableReset(hashtable, innerBatchSizes[newbatch - 1]);

#ifdef sequent
	/* -------------------
	 *  batch change completed
	 *  release the barrier, then reset the barrier for the next batch
	 * -------------------
	 */
        if (ParallelExecutorEnabled()) {
	   S_WAIT_BARRIER(&(hashtable->batchBarrier));
	   S_INIT_BARRIER(&(hashtable->batchBarrier), hashtable->nprocess);
	   S_UNLOCK(&(hashtable->tableLock));
	  }
#endif
    }
    econtext = get_cs_ExprContext((CommonState) hjstate);
    innerhashkey = get_hj_InnerHashKey(hjstate);
    readPos = NULL;
    readBlk = 0;
    if (ParallelExecutorEnabled())
       /* ----------------------
	*  build the hash table of the new batch in parallel
	* ----------------------
	*/
       readBuf = ABSADDR(hashtable->readbuf) + 
		 SlaveInfoP[MyPid].groupPid * BLCKSZ;
    else
       readBuf = ABSADDR(hashtable->readbuf);

    while ((slot = ExecHashJoinGetSavedTuple(hjstate,
					    readBuf, 
					    innerBatches[newbatch-1],
					    get_hj_HashTupleSlot(hjstate),
					    &readBlk,
					    &readPos))
	   && ! TupIsNull((Pointer) slot)) {
	set_ecxt_innertuple(econtext, slot);
	ExecHashTableInsert(hashtable, econtext, innerhashkey,NULL);
				/* possible bug - glass */
    }
    
    
#ifdef sequent
    /* ---------------
     *  we want to make sure that the processes proceed to the probe
     *  phase after all the processes finish the build phase
     * ---------------
     */
    if (ParallelExecutorEnabled())
	S_LOCK(&(hashtable->tableLock));
#endif
    if (ParallelExecutorEnabled() && --(hashtable->pcount) > 0) {
#ifdef sequent
	/* ----------------
	 *  if not the last process, wait on the barrier
	 * ----------------
	 */
	S_UNLOCK(&(hashtable->tableLock));
	S_WAIT_BARRIER(&(hashtable->batchBarrier));
#endif
       }
    else {
	/* -----------------
	 *  only the last process comes to this branch
	 *  now all the processes have finished the build phase
	 * ----------------
	 */

	/*
	 * after we build the hash table, the inner batch is no longer needed
	 */
	FileUnlink(innerBatches[newbatch - 1]);
	set_hj_OuterReadPos(hjstate, NULL);
	hashtable->pcount = hashtable->nprocess;
#ifdef sequent
	/* -----------------
	 *  release the barrier, then reset it for the next batch
	 * -----------------
	 */
	if (ParallelExecutorEnabled()) {
	    S_WAIT_BARRIER(&(hashtable->batchBarrier));
	    S_INIT_BARRIER(&(hashtable->batchBarrier), hashtable->nprocess);
	    S_UNLOCK(&(hashtable->tableLock));
	  }
#endif
       }
    hashtable->curbatch = newbatch;
    return newbatch;
}

/* ----------------------------------------------------------------
 *   	ExecHashJoinGetBatch
 *
 *   	determine the batch number for a bucketno
 *      +----------------+-------+-------+ ... +-------+
 *	0             nbuckets                       totalbuckets
 * batch         0           1       2     ...
 * ----------------------------------------------------------------
 */
 
int
ExecHashJoinGetBatch(bucketno, hashtable, nbatch)
int bucketno;
HashJoinTable hashtable;
int nbatch;
{
    int b;
    if (bucketno < hashtable->nbuckets || nbatch == 0)
       return 0;
    
    b = (float)(bucketno - hashtable->nbuckets) /
	(float)(hashtable->totalbuckets - hashtable->nbuckets) *
	nbatch;
    return b+1;
}

/* ----------------------------------------------------------------
 *   	ExecHashJoinSaveTuple
 *
 *   	save a tuple to a tmp file using a buffer.
 *	the first few bytes in a page is an offset to the end
 *	of the page.
 * ----------------------------------------------------------------
 */
 
char *
ExecHashJoinSaveTuple(heapTuple, buffer, file, position)
    HeapTuple	heapTuple;
    char 	*buffer;
    File 	file;
    char 	*position;
{
    long	*pageend;
    char	*pagestart;
    char	*pagebound;
    int		cc;

    pageend = (long*)buffer;
    pagestart = (char*)(buffer + sizeof(long));
    pagebound = buffer + BLCKSZ;
    if (position == NULL)
       position = pagestart;
    
    if (position + heapTuple->t_len >= pagebound) {
       cc = FileSeek(file, 0L, L_XTND);
       if (cc < 0)
	  perror("FileSeek");
       cc = FileWrite(file, buffer, BLCKSZ);
       NDirectFileWrite++;
       if (cc < 0)
	  perror("FileWrite");
       position = pagestart;
       *pageend = 0;
      }
    bcopy(heapTuple, position, heapTuple->t_len);
    position = (char*)LONGALIGN(position + heapTuple->t_len);
    *pageend = position - buffer;
    
    return position;
}
