/*-------------------------------------------------------------------------
 *
 * rtree.c--
 *    interface routines for the postgres rtree indexed access method.
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *    $Header: /usr/local/devel/pglite/cvs/src/backend/access/rtree/rtree.c,v 1.17 1996/02/24 00:01:54 jolly Exp $
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "storage/bufmgr.h"
#include "storage/bufpage.h"

#include "utils/elog.h"
#include "utils/palloc.h"
#include "utils/rel.h"
#include "utils/excid.h"

#include "access/heapam.h"
#include "access/genam.h"
#include "access/rtree.h"
#include "access/rtscan.h"
#include "access/funcindex.h"
#include "access/tupdesc.h"

#include "nodes/execnodes.h"
#include "nodes/plannodes.h"

#include "executor/executor.h"
#include "executor/tuptable.h"

#include "catalog/index.h"

typedef struct SPLITVEC {
    OffsetNumber	*spl_left;
    int			spl_nleft;
    char		*spl_ldatum;
    OffsetNumber	*spl_right;
    int			spl_nright;
    char		*spl_rdatum;
} SPLITVEC;

typedef struct RTSTATE {
    func_ptr unionFn;		/* union function */
    func_ptr sizeFn;		/* size function */
    func_ptr interFn;		/* intersection function */
} RTSTATE;

/* non-export function prototypes */
static InsertIndexResult rtdoinsert(Relation r, IndexTuple itup,
				    RTSTATE *rtstate);
static void rttighten(Relation r, RTSTACK *stk, char *datum, int att_size,
		      RTSTATE *rtstate);
static InsertIndexResult dosplit(Relation r, Buffer buffer, RTSTACK *stack,
				 IndexTuple itup, RTSTATE *rtstate);
static void rtintinsert(Relation r, RTSTACK *stk, IndexTuple ltup,
			IndexTuple rtup, RTSTATE *rtstate);
static void rtnewroot(Relation r, IndexTuple lt, IndexTuple rt);
static void picksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup,
		      RTSTATE *rtstate);
static void RTInitBuffer(Buffer b, uint32 f);
static OffsetNumber choose(Relation r, Page p, IndexTuple it,
			   RTSTATE *rtstate);
static int nospace(Page p, IndexTuple it);
static void initRtstate(RTSTATE *rtstate, Relation index);


void
rtbuild(Relation heap,
	Relation index,
	int natts,
	AttrNumber *attnum,
	IndexStrategy istrat,
	uint16 pcount,
	Datum *params,
	FuncIndexInfo *finfo,
	PredInfo *predInfo)
{
    HeapScanDesc scan;
    Buffer buffer;
    AttrNumber i;
    HeapTuple htup;
    IndexTuple itup;
    TupleDesc hd, id;
    InsertIndexResult res;
    Datum *d;
    bool *nulls;
    int nb, nh, ni;
    ExprContext *econtext;
    TupleTable tupleTable;
    TupleTableSlot *slot;
    Oid hrelid, irelid;
    Node *pred, *oldPred;
    RTSTATE rtState;

    initRtstate(&rtState, index);
		
    /* rtrees only know how to do stupid locking now */
    RelationSetLockForWrite(index);

    pred = predInfo->pred;
    oldPred = predInfo->oldPred;

    /*
     *  We expect to be called exactly once for any index relation.
     *  If that's not the case, big trouble's what we have.
     */
    
    if (oldPred == NULL && (nb = RelationGetNumberOfBlocks(index)) != 0)
	elog(WARN, "%s already contains data", index->rd_rel->relname.data);
    
    /* initialize the root page (if this is a new index) */
    if (oldPred == NULL) {
	buffer = ReadBuffer(index, P_NEW);
	RTInitBuffer(buffer, F_LEAF);
	WriteBuffer(buffer);
    }
    
    /* init the tuple descriptors and get set for a heap scan */
    hd = RelationGetTupleDescriptor(heap);
    id = RelationGetTupleDescriptor(index);
    d = (Datum *)palloc(natts * sizeof (*d));
    nulls = (bool *)palloc(natts * sizeof (*nulls));
    
    /*
     * If this is a predicate (partial) index, we will need to evaluate the
     * predicate using ExecQual, which requires the current tuple to be in a
     * slot of a TupleTable.  In addition, ExecQual must have an ExprContext
     * referring to that slot.  Here, we initialize dummy TupleTable and
     * ExprContext objects for this purpose. --Nels, Feb '92
     */
#ifndef OMIT_PARTIAL_INDEX
    if (pred != NULL || oldPred != NULL) {
	tupleTable = ExecCreateTupleTable(1);
	slot =  ExecAllocTableSlot(tupleTable);
	econtext = makeNode(ExprContext);
	FillDummyExprContext(econtext, slot, hd, buffer);
    }
#endif /* OMIT_PARTIAL_INDEX */    
    scan = heap_beginscan(heap, 0, NowTimeQual, 0, (ScanKey) NULL);
    htup = heap_getnext(scan, 0, &buffer);
    
    /* count the tuples as we insert them */
    nh = ni = 0;
    
    for (; HeapTupleIsValid(htup); htup = heap_getnext(scan, 0, &buffer)) {
	
	nh++;
	
	/*
	 * If oldPred != NULL, this is an EXTEND INDEX command, so skip
	 * this tuple if it was already in the existing partial index
	 */
	if (oldPred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
	    /*SetSlotContents(slot, htup); */
	    slot->val = htup;
	    if (ExecQual((List*)oldPred, econtext) == true) {
		ni++;
		continue;
	    }
#endif /* OMIT_PARTIAL_INDEX */    	
	}
	
	/* Skip this tuple if it doesn't satisfy the partial-index predicate */
	if (pred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
	    /*SetSlotContents(slot, htup); */
	    slot->val = htup;
	    if (ExecQual((List*)pred, econtext) == false)
		continue;
#endif /* OMIT_PARTIAL_INDEX */    	
	}
	
	ni++;
	
	/*
	 *  For the current heap tuple, extract all the attributes
	 *  we use in this index, and note which are null.
	 */
	
	for (i = 1; i <= natts; i++) {
	    int  attoff;
	    bool attnull;
	    
	    /*
	     *  Offsets are from the start of the tuple, and are
	     *  zero-based; indices are one-based.  The next call
	     *  returns i - 1.  That's data hiding for you.
	     */
	    
	    attoff = AttrNumberGetAttrOffset(i);
	    /*
	      d[attoff] = HeapTupleGetAttributeValue(htup, buffer,
	      */
	    d[attoff] = GetIndexValue(htup, 
				      hd,
				      attoff, 
				      attnum, 
				      finfo, 
				      &attnull,
				      buffer);
	    nulls[attoff] = (attnull ? 'n' : ' ');
	}
	
	/* form an index tuple and point it at the heap tuple */
	itup = index_formtuple(id, &d[0], nulls);
	itup->t_tid = htup->t_ctid;
	
	/*
	 *  Since we already have the index relation locked, we
	 *  call rtdoinsert directly.  Normal access method calls
	 *  dispatch through rtinsert, which locks the relation
	 *  for write.  This is the right thing to do if you're
	 *  inserting single tups, but not when you're initializing
	 *  the whole index at once.
	 */
	
	res = rtdoinsert(index, itup, &rtState);
	pfree(itup);
	pfree(res);
    }
    
    /* okay, all heap tuples are indexed */
    heap_endscan(scan);
    RelationUnsetLockForWrite(index);
    
    if (pred != NULL || oldPred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
	ExecDestroyTupleTable(tupleTable, true);
	pfree(econtext);
#endif /* OMIT_PARTIAL_INDEX */    	
    }
    
    /*
     *  Since we just counted the tuples in the heap, we update its
     *  stats in pg_relation to guarantee that the planner takes
     *  advantage of the index we just created.  UpdateStats() does a
     *  CommandCounterIncrement(), which flushes changed entries from
     *  the system relcache.  The act of constructing an index changes
     *  these heap and index tuples in the system catalogs, so they
     *  need to be flushed.  We close them to guarantee that they
     *  will be.
     */
    
    hrelid = heap->rd_id;
    irelid = index->rd_id;
    heap_close(heap);
    index_close(index);
    
    UpdateStats(hrelid, nh, true);
    UpdateStats(irelid, ni, false);
    
    if (oldPred != NULL) {
	if (ni == nh) pred = NULL;
	UpdateIndexPredicate(irelid, oldPred, pred);
    }
    
    /* be tidy */
    pfree(nulls);
    pfree(d);
}

/*
 *  rtinsert -- wrapper for rtree tuple insertion.
 *
 *    This is the public interface routine for tuple insertion in rtrees.
 *    It doesn't do any work; just locks the relation and passes the buck.
 */
InsertIndexResult
rtinsert(Relation r, IndexTuple itup)
{
    InsertIndexResult res;
    RTSTATE rtState;

    initRtstate(&rtState, r);
    
    RelationSetLockForWrite(r);
    res = rtdoinsert(r, itup, &rtState);
    
    /* XXX two-phase locking -- don't unlock the relation until EOT */
    return (res);
}

static InsertIndexResult
rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
{
    Page page;
    Buffer buffer;
    BlockNumber blk;
    IndexTuple which;
    OffsetNumber l;
    RTSTACK *stack;
    InsertIndexResult res;
    RTreePageOpaque opaque;
    char *datum;
    
    blk = P_ROOT;
    buffer = InvalidBuffer;
    stack = (RTSTACK *) NULL;
    
    do {
	/* let go of current buffer before getting next */
	if (buffer != InvalidBuffer)
	    ReleaseBuffer(buffer);
	
	/* get next buffer */
	buffer = ReadBuffer(r, blk);
	page = (Page) BufferGetPage(buffer);
	
	opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
	if (!(opaque->flags & F_LEAF)) {
	    RTSTACK *n;
	    ItemId iid;
	    
	    n = (RTSTACK *) palloc(sizeof(RTSTACK));
	    n->rts_parent = stack;
	    n->rts_blk = blk;
	    n->rts_child = choose(r, page, itup, rtstate);
	    stack = n;
	    
	    iid = PageGetItemId(page, n->rts_child);
	    which = (IndexTuple) PageGetItem(page, iid);
	    blk = ItemPointerGetBlockNumber(&(which->t_tid));
	}
    } while (!(opaque->flags & F_LEAF));
    
    if (nospace(page, itup)) {
	/* need to do a split */
	res = dosplit(r, buffer, stack, itup, rtstate);
	freestack(stack);
	WriteBuffer(buffer);  /* don't forget to release buffer! */
	return (res);
    }
    
    /* add the item and write the buffer */
    if (PageIsEmpty(page)) {
	l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
			FirstOffsetNumber,
			LP_USED);
    } else {
	l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
			OffsetNumberNext(PageGetMaxOffsetNumber(page)),
			LP_USED);
    }
    
    WriteBuffer(buffer);
    
    datum = (((char *) itup) + sizeof(IndexTupleData));
    
    /* now expand the page boundary in the parent to include the new child */
    rttighten(r, stack, datum,
	      (IndexTupleSize(itup) - sizeof(IndexTupleData)), rtstate);
    freestack(stack);
    
    /* build and return an InsertIndexResult for this insertion */
    res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
    ItemPointerSet(&(res->pointerData), blk, l);
    
    return (res);
}

static void
rttighten(Relation r,
	  RTSTACK *stk,
	  char *datum,
	  int att_size,
	  RTSTATE *rtstate)
{
    char *oldud;
    char *tdatum;
    Page p;
    float old_size, newd_size;
    Buffer b;
    
    if (stk == (RTSTACK *) NULL)
	return;
    
    b = ReadBuffer(r, stk->rts_blk);
    p = BufferGetPage(b);
    
    oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->rts_child));
    oldud += sizeof(IndexTupleData);
    
    (*rtstate->sizeFn)(oldud, &old_size);
    datum = (char *) (*rtstate->unionFn)(oldud, datum);
    
    (*rtstate->sizeFn)(datum, &newd_size);
    
    if (newd_size != old_size) {
	TupleDesc td = RelationGetTupleDescriptor(r);
	
	if (td->attrs[0]->attlen < 0) {
	    /*
	     * This is an internal page, so 'oldud' had better be a
	     * union (constant-length) key, too.  (See comment below.)
	     */
	    Assert(VARSIZE(datum) == VARSIZE(oldud));
	    memmove(oldud, datum, VARSIZE(datum));
	} else {
	    memmove(oldud, datum, att_size);
	}
	WriteBuffer(b);
	
	/*
	 *  The user may be defining an index on variable-sized data (like
	 *  polygons).  If so, we need to get a constant-sized datum for
	 *  insertion on the internal page.  We do this by calling the union
	 *  proc, which is guaranteed to return a rectangle.
	 */
	
	tdatum = (char *) (*rtstate->unionFn)(datum, datum);
	rttighten(r, stk->rts_parent, tdatum, att_size, rtstate);
	pfree(tdatum);
    } else {
	ReleaseBuffer(b);
    }
    pfree(datum);
}

/*
 *  dosplit -- split a page in the tree.
 *
 *    This is the quadratic-cost split algorithm Guttman describes in
 *    his paper.  The reason we chose it is that you can implement this
 *    with less information about the data types on which you're operating.
 */
static InsertIndexResult
dosplit(Relation r,
	Buffer buffer,
	RTSTACK *stack,
	IndexTuple itup,
	RTSTATE *rtstate)
{
    Page p;
    Buffer leftbuf, rightbuf;
    Page left, right;
    ItemId itemid;
    IndexTuple item;
    IndexTuple ltup, rtup;
    OffsetNumber maxoff;
    OffsetNumber i;
    OffsetNumber leftoff, rightoff;
    BlockNumber lbknum, rbknum;
    BlockNumber bufblock;
    RTreePageOpaque opaque;
    int blank;
    InsertIndexResult res;
    char *isnull;
    SPLITVEC v;
    TupleDesc tupDesc;
    
    isnull = (char *) palloc(r->rd_rel->relnatts);
    for (blank = 0; blank < r->rd_rel->relnatts; blank++)
	isnull[blank] = ' ';
    p = (Page) BufferGetPage(buffer);
    opaque = (RTreePageOpaque) PageGetSpecialPointer(p);
    
    /*
     *  The root of the tree is the first block in the relation.  If
     *  we're about to split the root, we need to do some hocus-pocus
     *  to enforce this guarantee.
     */
    
    if (BufferGetBlockNumber(buffer) == P_ROOT) {
	leftbuf = ReadBuffer(r, P_NEW);
	RTInitBuffer(leftbuf, opaque->flags);
	lbknum = BufferGetBlockNumber(leftbuf);
	left = (Page) BufferGetPage(leftbuf);
    } else {
	leftbuf = buffer;
	IncrBufferRefCount(buffer);
	lbknum = BufferGetBlockNumber(buffer);
	left = (Page) PageGetTempPage(p, sizeof(RTreePageOpaqueData));
    }
    
    rightbuf = ReadBuffer(r, P_NEW);
    RTInitBuffer(rightbuf, opaque->flags);
    rbknum = BufferGetBlockNumber(rightbuf);
    right = (Page) BufferGetPage(rightbuf);
    
    picksplit(r, p, &v, itup, rtstate);
    
    leftoff = rightoff = FirstOffsetNumber;
    maxoff = PageGetMaxOffsetNumber(p);
    for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
	itemid = PageGetItemId(p, i);
	item = (IndexTuple) PageGetItem(p, itemid);
	
	if (i == *(v.spl_left)) {
	    (void) PageAddItem(left, (Item) item, IndexTupleSize(item),
			       leftoff, LP_USED);
	    leftoff = OffsetNumberNext(leftoff);
	    v.spl_left++;	/* advance in left split vector */
	} else {
	    (void) PageAddItem(right, (Item) item, IndexTupleSize(item),
			       rightoff, LP_USED);
	    rightoff = OffsetNumberNext(rightoff);
	    v.spl_right++;	/* advance in right split vector */
	}
    }
    
    /* build an InsertIndexResult for this insertion */
    res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
    
    /* now insert the new index tuple */
    if (*(v.spl_left) != FirstOffsetNumber) {
	(void) PageAddItem(left, (Item) itup, IndexTupleSize(itup),
			   leftoff, LP_USED);
	leftoff = OffsetNumberNext(leftoff);
	ItemPointerSet(&(res->pointerData), lbknum, leftoff);
    } else {
	(void) PageAddItem(right, (Item) itup, IndexTupleSize(itup),
			   rightoff, LP_USED);
	rightoff = OffsetNumberNext(rightoff);
	ItemPointerSet(&(res->pointerData), rbknum, rightoff);
    }
    
    if ((bufblock = BufferGetBlockNumber(buffer)) != P_ROOT) {
	PageRestoreTempPage(left, p);
    }
    WriteBuffer(leftbuf);
    WriteBuffer(rightbuf);
    
    /*
     *  Okay, the page is split.  We have three things left to do:
     *
     *    1)  Adjust any active scans on this index to cope with changes
     *        we introduced in its structure by splitting this page.
     *
     *    2)  "Tighten" the bounding box of the pointer to the left
     *	      page in the parent node in the tree, if any.  Since we
     *	      moved a bunch of stuff off the left page, we expect it
     *	      to get smaller.  This happens in the internal insertion
     *        routine.
     *
     *    3)  Insert a pointer to the right page in the parent.  This
     *	      may cause the parent to split.  If it does, we need to
     *	      repeat steps one and two for each split node in the tree.
     */
    
    /* adjust active scans */
    rtadjscans(r, RTOP_SPLIT, bufblock, FirstOffsetNumber);
    
    tupDesc = r->rd_att;
    ltup = (IndexTuple) index_formtuple(tupDesc,
					(Datum *) &(v.spl_ldatum), isnull);
    rtup = (IndexTuple) index_formtuple(tupDesc,
					(Datum *) &(v.spl_rdatum), isnull);
    pfree(isnull);
    
    /* set pointers to new child pages in the internal index tuples */
    ItemPointerSet(&(ltup->t_tid), lbknum, 1);
    ItemPointerSet(&(rtup->t_tid), rbknum, 1);
    
    rtintinsert(r, stack, ltup, rtup, rtstate);
    
    pfree(ltup);
    pfree(rtup);
    
    return (res);
}

static void
rtintinsert(Relation r,
	    RTSTACK *stk,
	    IndexTuple ltup,
	    IndexTuple rtup,
	    RTSTATE *rtstate)
{
    IndexTuple old;
    Buffer b;
    Page p;
    char *ldatum, *rdatum, *newdatum;
    InsertIndexResult res;
    
    if (stk == (RTSTACK *) NULL) {
	rtnewroot(r, ltup, rtup);
	return;
    }
    
    b = ReadBuffer(r, stk->rts_blk);
    p = BufferGetPage(b);
    old = (IndexTuple) PageGetItem(p, PageGetItemId(p, stk->rts_child));
    
    /*
     *  This is a hack.  Right now, we force rtree keys to be constant size.
     *  To fix this, need delete the old key and add both left and right
     *  for the two new pages.  The insertion of left may force a split if
     *  the new left key is bigger than the old key.
     */
    
    if (IndexTupleSize(old) != IndexTupleSize(ltup))
	elog(WARN, "Variable-length rtree keys are not supported.");
    
    /* install pointer to left child */
    memmove(old, ltup,IndexTupleSize(ltup));
    
    if (nospace(p, rtup)) {
	newdatum = (((char *) ltup) + sizeof(IndexTupleData));
	rttighten(r, stk->rts_parent, newdatum,
		  (IndexTupleSize(ltup) - sizeof(IndexTupleData)), rtstate);
	res = dosplit(r, b, stk->rts_parent, rtup, rtstate);
	WriteBuffer(b);  /* don't forget to release buffer!  - 01/31/94 */
	pfree(res);
    } else {
	(void) PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
			   PageGetMaxOffsetNumber(p), LP_USED);
	WriteBuffer(b);
	ldatum = (((char *) ltup) + sizeof(IndexTupleData));
	rdatum = (((char *) rtup) + sizeof(IndexTupleData));
	newdatum = (char *) (*rtstate->unionFn)(ldatum, rdatum);
	
	rttighten(r, stk->rts_parent, newdatum,
		  (IndexTupleSize(rtup) - sizeof(IndexTupleData)), rtstate);
	
	pfree(newdatum);
    }
}

static void
rtnewroot(Relation r, IndexTuple lt, IndexTuple rt)
{
    Buffer b;
    Page p;
    
    b = ReadBuffer(r, P_ROOT);
    RTInitBuffer(b, 0);
    p = BufferGetPage(b);
    (void) PageAddItem(p, (Item) lt, IndexTupleSize(lt),
		       FirstOffsetNumber, LP_USED);
    (void) PageAddItem(p, (Item) rt, IndexTupleSize(rt),
		       OffsetNumberNext(FirstOffsetNumber), LP_USED);
    WriteBuffer(b);
}

static void
picksplit(Relation r,
	  Page page,
	  SPLITVEC *v,
	  IndexTuple itup,
	  RTSTATE *rtstate)
{
    OffsetNumber maxoff;
    OffsetNumber i, j;
    IndexTuple item_1, item_2;
    char *datum_alpha, *datum_beta;
    char *datum_l, *datum_r;
    char *union_d, *union_dl, *union_dr;
    char *inter_d;
    bool firsttime;
    float size_alpha, size_beta, size_union, size_inter;
    float size_waste, waste;
    float size_l, size_r;
    int nbytes;
    OffsetNumber seed_1 = 0, seed_2 = 0;
    OffsetNumber *left, *right;
    
    maxoff = PageGetMaxOffsetNumber(page);
    
    nbytes = (maxoff + 2) * sizeof(OffsetNumber);
    v->spl_left = (OffsetNumber *) palloc(nbytes);
    v->spl_right = (OffsetNumber *) palloc(nbytes);
    
    firsttime = true;
    waste = 0.0;
    
    for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i)) {
	item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
	datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
	for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j)) {
	    item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, j));
	    datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
	    
	    /* compute the wasted space by unioning these guys */
	    union_d = (char *)(rtstate->unionFn)(datum_alpha, datum_beta);
	    (rtstate->sizeFn)(union_d, &size_union);
	    inter_d = (char *)(rtstate->interFn)(datum_alpha, datum_beta);
	    (rtstate->sizeFn)(inter_d, &size_inter);
	    size_waste = size_union - size_inter;
	    
	    pfree(union_d);
	    
	    if (inter_d != (char *) NULL)
		pfree(inter_d);
	    
	    /*
	     *  are these a more promising split that what we've
	     *  already seen?
	     */
	    
	    if (size_waste > waste || firsttime) {
		waste = size_waste;
		seed_1 = i;
		seed_2 = j;
		firsttime = false;
	    }
	}
    }
    
    left = v->spl_left;
    v->spl_nleft = 0;
    right = v->spl_right;
    v->spl_nright = 0;
    
    item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_1));
    datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
    datum_l = (char *)(*rtstate->unionFn)(datum_alpha, datum_alpha);
    (*rtstate->sizeFn)(datum_l, &size_l);
    item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
    datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
    datum_r = (char *)(*rtstate->unionFn)(datum_beta, datum_beta);
    (*rtstate->sizeFn)(datum_r, &size_r);
    
    /*
     *  Now split up the regions between the two seeds.  An important
     *  property of this split algorithm is that the split vector v
     *  has the indices of items to be split in order in its left and
     *  right vectors.  We exploit this property by doing a merge in
     *  the code that actually splits the page.
     *
     *  For efficiency, we also place the new index tuple in this loop.
     *  This is handled at the very end, when we have placed all the
     *  existing tuples and i == maxoff + 1.
     */
    
    maxoff = OffsetNumberNext(maxoff);
    for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
	
	/*
	 *  If we've already decided where to place this item, just
	 *  put it on the right list.  Otherwise, we need to figure
	 *  out which page needs the least enlargement in order to
	 *  store the item.
	 */
	
	if (i == seed_1) {
	    *left++ = i;
	    v->spl_nleft++;
	    continue;
	} else if (i == seed_2) {
	    *right++ = i;
	    v->spl_nright++;
	    continue;
	}
	
	/* okay, which page needs least enlargement? */ 
	if (i == maxoff) {
	    item_1 = itup;
	} else {
	    item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
	}
	
	datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
	union_dl = (char *)(*rtstate->unionFn)(datum_l, datum_alpha);
	union_dr = (char *)(*rtstate->unionFn)(datum_r, datum_alpha);
	(*rtstate->sizeFn)(union_dl, &size_alpha);
	(*rtstate->sizeFn)(union_dr, &size_beta);
	
	/* pick which page to add it to */
	if (size_alpha - size_l < size_beta - size_r) {
	    pfree(datum_l);
	    pfree(union_dr);
	    datum_l = union_dl;
	    size_l = size_alpha;
	    *left++ = i;
	    v->spl_nleft++;
	} else {
	    pfree(datum_r);
	    pfree(union_dl);
	    datum_r = union_dr;
	    size_r = size_alpha;
	    *right++ = i;
	    v->spl_nright++;
	}
    }
    *left = *right = FirstOffsetNumber;	/* sentinel value, see dosplit() */
    
    v->spl_ldatum = datum_l;
    v->spl_rdatum = datum_r;
}

static void
RTInitBuffer(Buffer b, uint32 f)
{
    RTreePageOpaque opaque;
    Page page;
    Size pageSize;
    
    pageSize = BufferGetPageSize(b);
    
    page = BufferGetPage(b);
    memset(page, 0, (int) pageSize);
    PageInit(page, pageSize, sizeof(RTreePageOpaqueData));
    
    opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
    opaque->flags = f;
}

static OffsetNumber
choose(Relation r, Page p, IndexTuple it, RTSTATE *rtstate)
{
    OffsetNumber maxoff;
    OffsetNumber i;
    char *ud, *id;
    char *datum;
    float usize, dsize;
    OffsetNumber which;
    float which_grow;
    
    id = ((char *) it) + sizeof(IndexTupleData);
    maxoff = PageGetMaxOffsetNumber(p);
    which_grow = -1.0;
    which = -1;
    
    for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
	datum = (char *) PageGetItem(p, PageGetItemId(p, i));
	datum += sizeof(IndexTupleData);
	(*rtstate->sizeFn)(datum, &dsize);
	ud = (char *) (*rtstate->unionFn)(datum, id);
	(*rtstate->sizeFn)(ud, &usize);
	pfree(ud);
	if (which_grow < 0 || usize - dsize < which_grow) {
	    which = i;
	    which_grow = usize - dsize;
	    if (which_grow == 0)
		break;
	}
    }
    
    return (which);
}

static int
nospace(Page p, IndexTuple it)
{
    return (PageGetFreeSpace(p) < IndexTupleSize(it));
}

void
freestack(RTSTACK *s)
{
    RTSTACK *p;
    
    while (s != (RTSTACK *) NULL) {
	p = s->rts_parent;
	pfree(s);
	s = p;
    }
}

char *
rtdelete(Relation r, ItemPointer tid)
{
    BlockNumber blkno;
    OffsetNumber offnum;
    Buffer buf;
    Page page;
    
    /* must write-lock on delete */
    RelationSetLockForWrite(r);
    
    blkno = ItemPointerGetBlockNumber(tid);
    offnum = ItemPointerGetOffsetNumber(tid);
    
    /* adjust any scans that will be affected by this deletion */
    rtadjscans(r, RTOP_DEL, blkno, offnum);
    
    /* delete the index tuple */
    buf = ReadBuffer(r, blkno);
    page = BufferGetPage(buf);
    
    PageIndexTupleDelete(page, offnum);
    
    WriteBuffer(buf);
    
    /* XXX -- two-phase locking, don't release the write lock */
    return ((char *) NULL);
}

static void initRtstate(RTSTATE *rtstate, Relation index)
{
    RegProcedure union_proc, size_proc, inter_proc;
    func_ptr user_fn;
    int pronargs;

    union_proc = index_getprocid(index, 1, RT_UNION_PROC);
    size_proc = index_getprocid(index, 1, RT_SIZE_PROC);
    inter_proc = index_getprocid(index, 1, RT_INTER_PROC);
    fmgr_info(union_proc, &user_fn, &pronargs);
    rtstate->unionFn = user_fn;
    fmgr_info(size_proc, &user_fn, &pronargs);
    rtstate->sizeFn = user_fn;
    fmgr_info(inter_proc, &user_fn, &pronargs);
    rtstate->interFn = user_fn;
    return;
}

#define RTDEBUG
#ifdef RTDEBUG
#include "utils/geo-decls.h"

void
_rtdump(Relation r)
{
    Buffer buf;
    Page page;
    OffsetNumber offnum, maxoff;
    BlockNumber blkno;
    BlockNumber nblocks;
    RTreePageOpaque po;
    IndexTuple itup;
    BlockNumber itblkno;
    OffsetNumber itoffno;
    char *datum;
    char *itkey;
    
    nblocks = RelationGetNumberOfBlocks(r);
    for (blkno = 0; blkno < nblocks; blkno++) {
	buf = ReadBuffer(r, blkno);
	page = BufferGetPage(buf);
	po = (RTreePageOpaque) PageGetSpecialPointer(page);
	maxoff = PageGetMaxOffsetNumber(page);
	printf("Page %d maxoff %d <%s>\n", blkno, maxoff,
	       (po->flags & F_LEAF ? "LEAF" : "INTERNAL"));
	
	if (PageIsEmpty(page)) {
	    ReleaseBuffer(buf);
	    continue;
	}
	
	for (offnum = FirstOffsetNumber;
	     offnum <= maxoff;
	     offnum = OffsetNumberNext(offnum)) {
	    itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
	    itblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
	    itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid));
	    datum = ((char *) itup);
	    datum += sizeof(IndexTupleData);
	    itkey = (char *) box_out((BOX *) datum);
	    printf("\t[%d] size %d heap <%d,%d> key:%s\n",
		   offnum, IndexTupleSize(itup), itblkno, itoffno, itkey);
	    pfree(itkey);
	}
	
	ReleaseBuffer(buf);
    }
}
#endif /* defined RTDEBUG */

