head	1.6;
access;
symbols;
locks; strict;
comment	@ * @;


1.6
date	91.06.26.19.12.14;	author mao;	state Exp;
branches;
next	1.5;

1.5
date	91.06.12.08.57.36;	author mao;	state Exp;
branches;
next	1.4;

1.4
date	91.06.12.03.56.58;	author mao;	state Exp;
branches;
next	1.3;

1.3
date	91.06.11.07.41.44;	author mao;	state Exp;
branches;
next	1.2;

1.2
date	91.06.10.21.05.53;	author mao;	state Exp;
branches;
next	1.1;

1.1
date	91.05.28.14.34.15;	author mao;	state Exp;
branches;
next	;


desc
@no-overwrite btree item search
@


1.6
log
@compile-time switch for including nobtree code
@
text
@/*
 *  btsearch.c -- search code for postgres btrees.
 */

#include "tmp/c.h"

#ifdef NOBTREE
#include "tmp/postgres.h"

#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "storage/page.h"

#include "utils/log.h"
#include "utils/rel.h"
#include "utils/excid.h"

#include "access/heapam.h"
#include "access/genam.h"
#include "access/ftup.h"
#include "access/skey.h"
#include "access/sdir.h"
#include "access/tupmacs.h"
#include "access/nobtree.h"

RcsId("$Header: /users/mao/nobtree/RCS/nobtsearch.c,v 1.5 1991/06/12 08:57:36 mao Exp $");

/*
 *  _nobt_search() -- Search for a scan key in the index.
 *
 *	This routine is actually just a helper that sets things up and
 *	calls a recursive-descent search routine on the tree.
 */

NOBTStack
_nobt_search(rel, keysz, scankey, bufP)
    Relation rel;
    int keysz;
    ScanKey scankey;
    Buffer *bufP;
{
    *bufP = _nobt_getroot(rel, NOBT_READ);
    return (_nobt_searchr(rel, keysz, scankey, bufP, (NOBTStack) NULL));
}

/*
 *  _nobt_searchr() -- Search the tree recursively for a particular scankey.
 *
 *	This routine takes a pointer to a sequence number.  As we descend
 *	the tree, any time we see a key exactly equal to the one being
 *	inserted, we save its sequence number plus one.  If we are inserting
 *	a tuple, then we can use this number to distinguish among duplicates
 *	and to guarantee that we never store the same sequence number for the
 *	same key at any level in the tree.  This is an important guarantee,
 *	since the Lehman and Yao algorithm relies on being able to find
 *	its place in parent pages by looking at keys in the parent.
 */

NOBTStack
_nobt_searchr(rel, keysz, scankey, bufP, stack_in)
    Relation rel;
    int keysz;
    ScanKey scankey;
    Buffer *bufP;
    NOBTStack stack_in;
{
    NOBTStack stack;
    OffsetIndex offind;
    Page page;
    NOBTPageOpaque opaque;
    PageNumber newpg;
    BlockNumber par_blkno;
    BlockNumber blkno;
    ItemId itemid;
    NOBTIItem btitem;
    NOBTIItem item_save;
    int item_nbytes;

    page = BufferGetPage(*bufP, 0);
    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);

    /* if this is a leaf page, we're done */
    if (opaque->nobtpo_flags & NOBTP_LEAF)
	return (stack_in);

    /*
     *  Find the appropriate item on the internal page, and get the child
     *  page that it points to.
     */

    par_blkno = BufferGetBlockNumber(*bufP);
    offind = _nobt_binsrch(rel, *bufP, keysz, scankey, NOBT_DESCENT);
    itemid = PageGetItemId(page, offind);
    btitem = (NOBTIItem) PageGetItem(page, itemid);
    blkno = btitem->nobtii_child;

    /* save current, next key on stack */
    item_nbytes = ItemIdGetLength(itemid);
    item_save = (NOBTIItem) palloc(item_nbytes);
    bcopy((char *) btitem, (char *) item_save, item_nbytes);
    stack = (NOBTStack) palloc(sizeof(NOBTStackData));
    stack->nobts_offset = offind;
    stack->nobts_blkno = par_blkno;
    stack->nobts_btitem = item_save;

#ifndef	NORMAL
    /* if there's no "next" key on this page, use the high key */
    if (offind++ >= PageGetMaxOffsetIndex(page)) {
	if (opaque->nobtpo_next == P_NONE) {
	    stack->nobts_nxtitem = (NOBTIItem) NULL;
	} else {
	    itemid = PageGetItemId(page, 0);
	    item_nbytes = ItemIdGetLength(itemid);
	    item_save = (NOBTIItem) palloc(item_nbytes);
	    btitem = (NOBTIItem) PageGetItem(page, itemid);
	    bcopy((char *) btitem, (char *) item_save, item_nbytes);
	    stack->nobts_nxtitem = item_save;
	}
    } else {
	itemid = PageGetItemId(page, offind);
	item_nbytes = ItemIdGetLength(itemid);
	item_save = (NOBTIItem) palloc(item_nbytes);
	btitem = (NOBTIItem) PageGetItem(page, itemid);
	bcopy((char *) btitem, (char *) item_save, item_nbytes);
	stack->nobts_nxtitem = item_save;
    }
#endif	/* ndef NORMAL */

    stack->nobts_parent = stack_in;

    /* drop the read lock on the parent page and acquire one on the child */
    _nobt_relbuf(rel, *bufP, NOBT_READ);
    *bufP = _nobt_getbuf(rel, blkno, NOBT_READ);

    /*
     *  Race -- the page we just grabbed may have split since we read its
     *  pointer in the parent.  If it has, we may need to move right to its
     *  new sibling.  Do that.
     */

    *bufP = _nobt_moveright(rel, *bufP, keysz, scankey, NOBT_READ, stack);

    /* okay, all set to move down a level */
    return (_nobt_searchr(rel, keysz, scankey, bufP, stack));
}

/*
 *  _nobt_moveright() -- move right in the btree if necessary.
 *
 *	When we drop and reacquire a pointer to a page, it is possible that
 *	the page has changed in the meanwhile.  If this happens, we're
 *	guaranteed that the page has "split right" -- that is, that any
 *	data that appeared on the page originally is either on the page
 *	or strictly to the right of it.
 *
 *	This routine decides whether or not we need to move right in the
 *	tree by examining the high key entry on the page.  If that entry
 *	is strictly less than one we expect to be on the page, then our
 *	picture of the page is incorrect and we need to move right.
 *
 *	On entry, we have the buffer pinned and a lock of the proper type.
 *	If we move right, we release the buffer and lock and acquire the
 *	same on the right sibling.
 */

Buffer
_nobt_moveright(rel, buf, keysz, scankey, access, stack)
    Relation rel;
    Buffer buf;
    int keysz;
    ScanKey scankey;
    int access;
    NOBTStack stack;
{
    Page page;
    PageNumber right;
    PageNumber newpg;
    NOBTPageOpaque opaque;
    ItemId hikey;
    ItemId itemid;
    BlockNumber rblkno;
    IndexTuple stktupa, chktupa;
    IndexTuple stktupb, chktupb;
    NOBTIItem chkiitem;
    NOBTLItem chklitem;
    char *stkstra, *chkstra;
    char *stkstrb, *chkstrb;
    int cmpsiza, cmpsizb;
    bool inconsistent;
    bool isleaf;

    page = BufferGetPage(buf, 0);
    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
    inconsistent = false;

    if (opaque->nobtpo_flags & NOBTP_LEAF)
	isleaf = true;
    else
	isleaf = false;

    /*
     *  For the no-overwrite implementation, here are the things that can
     *  cause us to have to move around in the tree:
     *
     *    +  The page we have just come to has just split, but the parent
     *	     now contains the correct entries for the new children, so the
     *       tree is consistent.  In this case, we adjust the parent pointer
     *       stack, move right as far as necessary, and continue.
     *
     *    +  The tree is genuinely inconsistent -- there was a failure during
     *       the write of the modified tree to disk, and some set of pages
     *       failed to make it out.  In this case, we must immediately repair
     *       the damage.  We're on one of the modified pages.
     *
     *  Short-term Lehman and Yao locking guarantees that if no failure has
     *  occurred, then case one must have happened.  The no-overwrite algorithm
     *	by Sullivan guarantees that we can recover from two in the face of
     *  any failure.
     */

#ifdef	SHADOW
    /* first, if this page has been replaced, then move to the new page */
    while ((newpg = opaque->nobtpo_replaced) != P_NONE) {
	_nobt_relbuf(rel, buf, access);
	buf = _nobt_getbuf(rel, newpg, access);
	page = BufferGetPage(buf, 0);
	opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
	inconsistent = true;
    }
#endif	/* SHADOW */

    /*
     *  If the key range on the parent doesn't match the key range on the
     *  child at which we've arrived (after possibly following "replaced"
     *  links above), then the tree may be inconsistent.  The hairy code
     *  structure (next page, prev page == P_NONE -- four cases) is due to
     *  the fact that the high and low key in the index are handled specially
     *  to avoid expensive updates.
     */

#ifndef	NORMAL
    if (opaque->nobtpo_next == P_NONE) {
	if (opaque->nobtpo_prev == P_NONE) {
	    if (stack != (NOBTStack) NULL)
		inconsistent = true;
	} else if (stack == NULL) {
	    inconsistent = true;
	} else {
	    stkstra = ((char *) stack->nobts_btitem) + sizeof(NOBTIItemData);
	    if (isleaf) {
		chklitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, 0));
		chktupa = &(chklitem->nobtli_itup);
		chkstra = ((char *) chktupa) + sizeof(IndexTupleData);
		cmpsiza = IndexTupleSize(chktupa) - sizeof(IndexTupleData);
	    } else {
		chkiitem = (NOBTIItem) PageGetItem(page, PageGetItemId(page, 0));
		chkstra = ((char *) chkiitem) + sizeof(NOBTIItemData);
		cmpsiza = NOBTIItemGetSize(chkiitem) - sizeof(NOBTIItemData);
	    }
	    if (bcmp(stkstra, chkstra, cmpsiza) != 0)
		inconsistent = true;
	}
    } else {
	if (stack == NULL || stack->nobts_nxtitem == (NOBTIItem) NULL) {
	    inconsistent = true;
	} else if (opaque->nobtpo_prev == P_NONE) {
	    if (stack->nobts_offset != 0)
		inconsistent = true;
	} else {
	    /* stack tuple a, check tuple a are the low key on the page */
	    stkstra = ((char *) stack->nobts_btitem) + sizeof(NOBTIItemData);
	    if (isleaf) {
		chklitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, 1));
		chktupa = &(chklitem->nobtli_itup);
		chkstra = ((char *) chktupa) + sizeof(IndexTupleData);
		cmpsiza = IndexTupleSize(chktupa) - sizeof(IndexTupleData);
	    } else {
		chkiitem = (NOBTIItem) PageGetItem(page, PageGetItemId(page, 1));
		chkstra = ((char *) chkiitem) + sizeof(NOBTIItemData);
		cmpsiza = NOBTIItemGetSize(chkiitem) - sizeof(NOBTIItemData);
	    }

	    /* stack tuple b, check tuple b are the high key */
	    stkstrb = ((char *) stack->nobts_nxtitem) + sizeof(NOBTIItemData);
	    if (isleaf) {
		chklitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, 0));
		chktupb = &(chklitem->nobtli_itup);
		chkstrb = ((char *) chktupb) + sizeof(IndexTupleData);
		cmpsizb = IndexTupleSize(chktupb) - sizeof(IndexTupleData);
	    } else {
		chkiitem = (NOBTIItem) PageGetItem(page, PageGetItemId(page, 0));
		chkstrb = ((char *) chkiitem) + sizeof(NOBTIItemData);
		cmpsizb = NOBTIItemGetSize(chkiitem) - sizeof(NOBTIItemData);
	    }

	    if (bcmp(stkstra, chkstra, cmpsiza) != 0
		|| bcmp(stkstrb, chkstrb, cmpsizb) != 0)
		inconsistent = true;
	}
    }
#endif	/* ndef NORMAL */

    /* XXX XXX XXX peer pointer check? */

    /* if the tree may be inconsistent, it has to be fixed... */
    if (inconsistent) {
	elog(NOTICE, "inconsistency detected, should do something here...");
    }

    return (buf);
}

/*
 *  _nobt_skeycmp() -- compare a scan key to a particular item on a page using
 *		     a requested strategy (<, <=, =, >=, >).
 *
 *	We ignore the sequence numbers stored in the btree item here.  Those
 *	numbers are intended for use internally only, in repositioning a
 *	scan after a page split.  They do not impose any meaningful ordering.
 *
 *	The comparison is A <op> B, where A is the scan key and B is the
 *	tuple pointed at by itemid on page.
 */

bool
_nobt_skeycmp(rel, keysz, scankey, page, itemid, strat)
    Relation rel;
    Size keysz;
    ScanKey scankey;
    Page page;
    ItemId itemid;
    StrategyNumber strat;
{
    NOBTIItem iitem;
    NOBTLItem litem;
    IndexTuple indexTuple;
    TupleDescriptor tupDes;
    ScanKeyEntry entry;
    int i;
    Datum attrDatum;
    Datum keyDatum;
    NOBTPageOpaque opaque;
    bool compare;
    bool isNull;
    char *tempd;

    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);

    tupDes = RelationGetTupleDescriptor(rel);
    if (opaque->nobtpo_flags & NOBTP_LEAF) {
	litem = (NOBTLItem) PageGetItem(page, itemid);
	indexTuple = &(litem->nobtli_itup);

	/* see if the comparison is true for all of the key attributes */
	for (i=1; i <= keysz; i++) {

	    entry = &scankey->data[i-1];
	    attrDatum = IndexTupleGetAttributeValue(indexTuple,
						    entry->attributeNumber,
						    tupDes, &isNull);
	    keyDatum  = entry->argument;

	    compare = _nobt_invokestrat(rel, i, strat, keyDatum, attrDatum);
	    if (!compare)
		return (false);
	}
    } else {
	iitem = (NOBTIItem) PageGetItem(page, itemid);
	tempd = ((char *) iitem) + sizeof(NOBTIItemData);

	entry = &scankey->data[0];
	keyDatum  = entry->argument;
	attrDatum = (Datum) fetchatt(((struct attribute **) tupDes), tempd);

	compare = _nobt_invokestrat(rel, 1, strat, keyDatum, attrDatum);
	if (!compare)
	    return (false);
    }
    return (true);
}

/*
 *  _nobt_binsrch() -- Do a binary search for a key on a particular page.
 *
 *	The scankey we get has the compare function stored in the procedure
 *	entry of each data struct.  We invoke this regproc to do the
 *	comparison for every key in the scankey.  _nobt_binsrch() returns
 *	the OffsetIndex of the first matching key on the page, or the
 *	OffsetIndex at which the matching key would appear if it were
 *	on this page.
 *
 *	By the time this procedure is called, we're sure we're looking
 *	at the right page -- don't need to walk right.  _nobt_binsrch() has
 *	no lock or refcount side effects on the buffer.
 */

OffsetIndex
_nobt_binsrch(rel, buf, keysz, scankey, srchtype)
    Relation rel;
    Buffer buf;
    int keysz;
    ScanKey scankey;
    int srchtype;
{
    TupleDescriptor itupdesc;
    Page page;
    NOBTPageOpaque opaque;
    OffsetIndex low, mid, high;
    bool match;
    int result;

    page = BufferGetPage(buf, 0);
    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);

    /* by convention, item 0 on any non-leftmost page is the high key */
    if (opaque->nobtpo_next != P_NONE)
	low = 1;
    else
	low = 0;

    high = PageGetMaxOffsetIndex(page);

    /*
     *  Since for non-leftmost pages, the zeroeth item on the page is the
     *  high key, there are two notions of emptiness.  One is if nothing
     *  appears on the page.  The other is if nothing but the high key does.
     */

    if (PageIsEmpty(page) || (opaque->nobtpo_next != P_NONE && high == low))
	return (low);

    itupdesc = RelationGetTupleDescriptor(rel);
    match = false;

    while ((high - low) > 1) {
	mid = low + ((high - low) / 2);
	result = _nobt_compare(rel, itupdesc, page, keysz, scankey, mid);

	if (result > 0)
	    low = mid;
	else if (result < 0)
	    high = mid - 1;
	else {
	    match = true;
	    break;
	}
    }

    /* if we found a match, we want to find the first one on the page */
    if (match) {
	return (_nobt_firsteq(rel, itupdesc, page, keysz, scankey, mid));
    } else {

	/*
	 *  We terminated because the endpoints got too close together.  There
	 *  are two cases to take care of.
	 *
	 *  For non-insertion searches on internal pages, we want to point at
	 *  the last key <, or first key =, the scankey on the page.  This
	 *  guarantees that we'll descend the tree correctly.
	 *
	 *  For all other cases, we want to point at the first key >=
	 *  the scankey on the page.  This guarantees that scans and
	 *  insertions will happen correctly.
	 */

	opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
	if (!(opaque->nobtpo_flags & NOBTP_LEAF) && srchtype == NOBT_DESCENT) {

	    /*
	     *  We want the last key <, or first key ==, the scan key.
	     */

	    result = _nobt_compare(rel, itupdesc, page, keysz, scankey, high);

	    if (result == 0) {
		return (_nobt_firsteq(rel, itupdesc, page, keysz, scankey, high));
	    } else if (result > 0) {
		return (high);
	    } else {
		return (low);
	    }
	    /* NOTREACHED */
	} else {

	    /* we want the first key >= the scan key */
	    result = _nobt_compare(rel, itupdesc, page, keysz, scankey, low);
	    if (result <= 0) {
		return (low);
	    } else {
		if (low == high)
		    return (low + 1);

		result = _nobt_compare(rel, itupdesc, page, keysz, scankey, high);
		if (result <= 0)
		    return (high);
		else
		    return (high + 1);
	    }
	}
	/* NOTREACHED */
    }
}

OffsetIndex
_nobt_firsteq(rel, itupdesc, page, keysz, scankey, offind)
    Relation rel;
    TupleDescriptor itupdesc;
    Page page;
    Size keysz;
    ScanKey scankey;
    OffsetIndex offind;
{
    NOBTPageOpaque opaque;
    OffsetIndex limit;

    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);

    /* skip the high key, if any */
    if (opaque->nobtpo_next != P_NONE)
	limit = 1;
    else
	limit = 0;

    /* walk backwards looking for the first key in the chain of duplicates */
    while (offind > limit
	   && _nobt_compare(rel, itupdesc, page,
			  keysz, scankey, offind - 1) == 0) {
	offind--;
    }

    return (offind);
}

/*
 *  _nobt_compare() -- Compare scankey to a particular tuple on the page.
 *
 *	This routine returns:
 *	    -1 if scankey < tuple at offind;
 *	     0 if scankey == tuple at offind;
 *	    +1 if scankey > tuple at offind.
 *
 *	In order to avoid having to propogate changes up the tree any time
 *	a new minimal key is inserted, the leftmost entry on the leftmost
 *	page is less than all possible keys, by definition.
 */

int
_nobt_compare(rel, itupdesc, page, keysz, scankey, offind)
    Relation rel;
    TupleDescriptor itupdesc;
    Page page;
    int keysz;
    ScanKey scankey;
    OffsetIndex offind;
{
    Datum datum;
    NOBTLItem btlitem;
    NOBTIItem btiitem;
    ItemId itemid;
    IndexTuple itup;
    NOBTPageOpaque opaque;
    ScanKeyEntry entry;
    AttributeNumber attno;
    int result;
    int i;
    char *tempd;
    Boolean null;
    bool isleaf;

    /*
     *  If this is a leftmost internal page, and if our comparison is
     *  with the first key on the page, then the item at that position is
     *  by definition less than the scan key.
     */

    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);

    if (opaque->nobtpo_flags & NOBTP_LEAF)
	isleaf = true;
    else
	isleaf = false;

    if (!isleaf && opaque->nobtpo_prev == P_NONE && offind == 0) {
	    itemid = PageGetItemId(page, offind);

	    /*
	     *  If the item on the page is equal to the scankey, that's
	     *  okay to admit.  We just can't claim that the first key on
	     *  the page is greater than anything.
	     */

	    if (_nobt_skeycmp(rel, keysz, scankey, page, itemid,
			    NOBTEqualStrategyNumber)) {
		return (0);
	    }
	    return (1);
    }

    /*
     *  The scan key is set up with the attribute number associated with each
     *  term in the key.  It is important that, if the index is multi-key,
     *  the scan contain the first k key attributes, and that they be in
     *  order.  If you think about how multi-key ordering works, you'll
     *  understand why this is.
     *
     *  We don't test for violation of this condition here.
     */

    entry = &(scankey->data[0]);

    if (isleaf) {
	btlitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, offind));
	itup = &(btlitem->nobtli_itup);
	attno = entry->attributeNumber;
	datum = (Datum) IndexTupleGetAttributeValue(itup, attno,
						    itupdesc, &null);
    } else {
	btiitem = (NOBTIItem) PageGetItem(page, PageGetItemId(page, offind));
	tempd = ((char *) btiitem) + sizeof(NOBTIItemData);
	datum = (Datum) fetchatt(((struct attribute **) itupdesc), tempd);
    }

    result = (int) (*(entry->func))(entry->argument, datum);
    return (result);
}

/*
 *  _nobt_next() -- Get the next item in a scan.
 *
 *	On entry, we have a valid currentItemData in the scan, and a
 *	read lock on the page that contains that item.  We do not have
 *	the page pinned.  We return the next item in the scan.  On
 *	exit, we have the page containing the next item locked but not
 *	pinned.
 */

RetrieveIndexResult
_nobt_next(scan, dir)
    IndexScanDesc scan;
    ScanDirection dir;
{
    Relation rel;
    NOBTPageOpaque opaque;
    Buffer buf;
    Page page;
    OffsetIndex offind, maxoff;
    RetrieveIndexResult res;
    BlockNumber blkno;
    ItemPointer current;
    ItemPointer iptr;
    NOBTLItem btitem;
    IndexTuple itup;
    NOBTScanOpaque so;

    rel = scan->relation;
    so = (NOBTScanOpaque) scan->opaque;
    current = &(scan->currentItemData);

    if (!BufferIsValid(so->nobtso_curbuf))
	so->nobtso_curbuf = _nobt_getbuf(rel, ItemPointerGetBlockNumber(current),
				     NOBT_READ);

    /* we still have the buffer pinned and locked */
    buf = so->nobtso_curbuf;
    blkno = BufferGetBlockNumber(buf);

    /* step one tuple in the appropriate direction */
    if (!_nobt_step(scan, &buf, dir))
	return ((RetrieveIndexResult) NULL);

    /* by here, current is the tuple we want to return */
    offind = ItemPointerGetOffsetNumber(current, 0) - 1;
    page = BufferGetPage(buf, 0);
    btitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, offind));
    itup = &btitem->nobtli_itup;

    if (_nobt_checkqual(scan, itup)) {
	iptr = (ItemPointer) palloc(sizeof(ItemPointerData));
	bcopy((char *) &(itup->t_tid), (char *) iptr, sizeof(ItemPointerData));
	res = ItemPointerFormRetrieveIndexResult(current, iptr);

	/* remember which buffer we have pinned and locked */
	so->nobtso_curbuf = buf;
    } else {
	ItemPointerSetInvalid(current);
	so->nobtso_curbuf = InvalidBuffer;
	_nobt_relbuf(rel, buf, NOBT_READ);
	res = (RetrieveIndexResult) NULL;
    }

    return (res);
}

/*
 *  _nobt_first() -- Find the first item in a scan.
 *
 *	We need to be clever about the type of scan, the operation it's
 *	performing, and the tree ordering.  We return the RetrieveIndexResult
 *	of the first item in the tree that satisfies the qualification
 *	associated with the scan descriptor.  On exit, the page containing
 *	the current index tuple is read locked and pinned, and the scan's
 *	opaque data entry is updated to include the buffer.
 */

RetrieveIndexResult
_nobt_first(scan, dir)
    IndexScanDesc scan;
    ScanDirection dir;
{
    Relation rel;
    TupleDescriptor itupdesc;
    Buffer buf;
    Page page;
    NOBTStack stack;
    OffsetIndex offind, maxoff;
    NOBTLItem btitem;
    IndexTuple itup;
    ItemPointer current;
    ItemPointer iptr;
    BlockNumber blkno;
    StrategyNumber strat;
    RetrieveIndexResult res;
    RegProcedure proc;
    int result;
    NOBTScanOpaque so;
    ScanKeyData skdata;

    /* if we just need to walk down one edge of the tree, do that */
    if (scan->scanFromEnd)
	return (_nobt_endpoint(scan, dir));

    rel = scan->relation;
    itupdesc = RelationGetTupleDescriptor(scan->relation);
    current = &(scan->currentItemData);
    so = (NOBTScanOpaque) scan->opaque;

    /*
     *  Okay, we want something more complicated.  What we'll do is use
     *  the first item in the scan key passed in (which has been correctly
     *  ordered to take advantage of index ordering) to position ourselves
     *  at the right place in the scan.
     */

    /*
     *  XXX -- The attribute number stored in the scan key is the attno
     *	       in the heap relation.  We need to transmogrify this into
     *         the index relation attno here.  For the moment, we have
     *	       hardwired attno == 1.
     */
    proc = index_getprocid(rel, 1, NOBTORDER_PROC);
    ScanKeyEntryInitialize(&skdata, 0x0, 1, proc,
			   scan->keyData.data[0].argument);

    stack = _nobt_search(rel, 1, &skdata, &buf);
    _nobt_freestack(stack);

    /* find the nearest match to the manufactured scan key on the page */
    offind = _nobt_binsrch(rel, buf, 1, &skdata, NOBT_DESCENT);
    page = BufferGetPage(buf, 0);
    maxoff = PageGetMaxOffsetIndex(page);

    if (offind > maxoff)
	offind = maxoff;

    blkno = BufferGetBlockNumber(buf);
    ItemPointerSet(current, 0, blkno, 0, offind + 1);

    /* now find the right place to start the scan */
    result = _nobt_compare(rel, itupdesc, page, 1, &skdata, offind);
    strat = _nobt_getstrat(rel, 1, scan->keyData.data[0].procedure);

    switch (strat) {
      case NOBTLessStrategyNumber:
	if (result <= 0) {
	    do {
		if (!_nobt_twostep(scan, &buf, BackwardScanDirection))
		    break;

		offind = ItemPointerGetOffsetNumber(current, 0) - 1;
		page = BufferGetPage(buf, 0);
		result = _nobt_compare(rel, itupdesc, page, 1, &skdata, offind);
	    } while (result <= 0);

	    /* if this is true, the key we just looked at is gone */
	    if (result > 0)
		(void) _nobt_twostep(scan, &buf, ForwardScanDirection);
	}
	break;

      case NOBTLessEqualStrategyNumber:
	if (result >= 0) {
	    do {
		if (!_nobt_twostep(scan, &buf, ForwardScanDirection))
		    break;

		offind = ItemPointerGetOffsetNumber(current, 0) - 1;
		page = BufferGetPage(buf, 0);
		result = _nobt_compare(rel, itupdesc, page, 1, &skdata, offind);
	    } while (result >= 0);

	    if (result < 0)
		(void) _nobt_twostep(scan, &buf, BackwardScanDirection);
	}
	break;

      case NOBTEqualStrategyNumber:
	if (result != 0) {
	  _nobt_relbuf(scan->relation, buf, NOBT_READ);
	  so->nobtso_curbuf = InvalidBuffer;
	  ItemPointerSetInvalid(&(scan->currentItemData));
	  return ((RetrieveIndexResult) NULL);
	}
	break;

      case NOBTGreaterEqualStrategyNumber:
	if (result < 0) {
	    do {
		if (!_nobt_twostep(scan, &buf, BackwardScanDirection))
		    break;

		offind = ItemPointerGetOffsetNumber(current, 0) - 1;
		page = BufferGetPage(buf, 0);
		result = _nobt_compare(rel, itupdesc, page, 1, &skdata, offind);
	    } while (result < 0);
	}
	break;

      case NOBTGreaterStrategyNumber:
	if (result >= 0) {
	    do {
		if (!_nobt_twostep(scan, &buf, ForwardScanDirection))
		    break;

		offind = ItemPointerGetOffsetNumber(current, 0) - 1;
		page = BufferGetPage(buf, 0);
		result = _nobt_compare(rel, itupdesc, page, 1, &skdata, offind);
	    } while (result >= 0);
	}
	break;
    }

    /* okay, current item pointer for the scan is right */
    offind = ItemPointerGetOffsetNumber(current, 0) - 1;
    page = BufferGetPage(buf, 0);
    btitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, offind));
    itup = &btitem->nobtli_itup;

    if (_nobt_checkqual(scan, itup)) {
	iptr = (ItemPointer) palloc(sizeof(ItemPointerData));
	bcopy((char *) &(itup->t_tid), (char *) iptr, sizeof(ItemPointerData));
	res = ItemPointerFormRetrieveIndexResult(current, iptr);

	/* remember which buffer we have pinned */
	so->nobtso_curbuf = buf;
    } else {
	ItemPointerSetInvalid(current);
	so->nobtso_curbuf = InvalidBuffer;
	_nobt_relbuf(rel, buf, NOBT_READ);
	res = (RetrieveIndexResult) NULL;
    }

    return (res);
}

/*
 *  _nobt_step() -- Step one item in the requested direction in a scan on
 *		  the tree.
 *
 *	If no adjacent record exists in the requested direction, return
 *	false.  Else, return true and set the currentItemData for the
 *	scan to the right thing.
 */

bool
_nobt_step(scan, bufP, dir)
    IndexScanDesc scan;
    Buffer *bufP;
    ScanDirection dir;
{
    Page page;
    NOBTPageOpaque opaque;
    OffsetIndex offind, maxoff;
    OffsetIndex start;
    BlockNumber blkno;
    BlockNumber obknum;
    NOBTScanOpaque so;
    ItemPointer current;
    Relation rel;

    rel = scan->relation;
    current = &(scan->currentItemData);
    offind = ItemPointerGetOffsetNumber(current, 0) - 1;
    page = BufferGetPage(*bufP, 0);
    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
    so = (NOBTScanOpaque) scan->opaque;
    maxoff = PageGetMaxOffsetIndex(page);

    /* get the next tuple */
    if (ScanDirectionIsForward(dir)) {
	if (offind < maxoff) {
	    offind++;
	} else {

	    /* if we're at end of scan, release the buffer and return */
	    if ((blkno = opaque->nobtpo_next) == P_NONE) {
		_nobt_relbuf(rel, *bufP, NOBT_READ);
		ItemPointerSetInvalid(current);
		*bufP = so->nobtso_curbuf = InvalidBuffer;
		return (false);
	    } else {

		/* walk right to the next page with data */
		_nobt_relbuf(rel, *bufP, NOBT_READ);
		for (;;) {
		    *bufP = _nobt_getbuf(rel, blkno, NOBT_READ);
		    page = BufferGetPage(*bufP, 0);
		    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
		    maxoff = PageGetMaxOffsetIndex(page);
		    if (opaque->nobtpo_next != P_NONE)
			start = 1;
		    else
			start = 0;

		    if (!PageIsEmpty(page) && start <= maxoff) {
			break;
		    } else {
			blkno = opaque->nobtpo_next;
			_nobt_relbuf(rel, *bufP, NOBT_READ);
			if (blkno == P_NONE) {
			    *bufP = so->nobtso_curbuf = InvalidBuffer;
			    ItemPointerSetInvalid(current);
			    return (false);
			}
		    }
		}
		offind = start;
	    }
	}
    } else if (ScanDirectionIsBackward(dir)) {

	/* remember that high key is item zero on non-rightmost pages */
	if (opaque->nobtpo_next == P_NONE)
	    start = 0;
	else
	    start = 1;

	if (offind > start) {
	    offind--;
	} else {

	    /* if we're at end of scan, release the buffer and return */
	    if ((blkno = opaque->nobtpo_prev) == P_NONE) {
		_nobt_relbuf(rel, *bufP, NOBT_READ);
		*bufP = so->nobtso_curbuf = InvalidBuffer;
		ItemPointerSetInvalid(current);
		return (false);
	    } else {

		obknum = BufferGetBlockNumber(*bufP);

		/* walk right to the next page with data */
		_nobt_relbuf(rel, *bufP, NOBT_READ);
		for (;;) {
		    *bufP = _nobt_getbuf(rel, blkno, NOBT_READ);
		    page = BufferGetPage(*bufP, 0);
		    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
		    maxoff = PageGetMaxOffsetIndex(page);

		    /*
		     *  If the adjacent page just split, then we may have the
		     *  wrong block.  Handle this case.  Because pages only
		     *  split right, we don't have to worry about this failing
		     *  to terminate.
		     */

		    while (opaque->nobtpo_next != obknum) {
			blkno = opaque->nobtpo_next;
			_nobt_relbuf(rel, *bufP, NOBT_READ);
			*bufP = _nobt_getbuf(rel, blkno, NOBT_READ);
			page = BufferGetPage(*bufP, 0);
			opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
			maxoff = PageGetMaxOffsetIndex(page);
		    }

		    /* don't consider the high key */
		    if (opaque->nobtpo_next == P_NONE)
			start = 0;
		    else
			start = 1;

		    /* anything to look at here? */
		    if (!PageIsEmpty(page) && maxoff >= start) {
			break;
		    } else {
			blkno = opaque->nobtpo_prev;
			obknum = BufferGetBlockNumber(*bufP);
			_nobt_relbuf(rel, *bufP, NOBT_READ);
			if (blkno == P_NONE) {
			    *bufP = so->nobtso_curbuf = InvalidBuffer;
			    ItemPointerSetInvalid(current);
			    return (false);
			}
		    }
		}
		offind = maxoff;
	    }
	}
    }
    blkno = BufferGetBlockNumber(*bufP);
    so->nobtso_curbuf = *bufP;
    ItemPointerSet(current, 0, blkno, 0, offind + 1);

    return (true);
}

/*
 *  _nobt_twostep() -- Move to an adjacent record in a scan on the tree,
 *		     if an adjacent record exists.
 *
 *	This is like _nobt_step, except that if no adjacent record exists
 *	it restores us to where we were before trying the step.  This is
 *	only hairy when you cross page boundaries, since the page you cross
 *	from could have records inserted or deleted, or could even split.
 *	This is unlikely, but we try to handle it correctly here anyway.
 *
 *	This routine contains the only case in which our changes to Lehman
 *	and Yao's algorithm.
 *
 *	Like step, this routine leaves the scan's currentItemData in the
 *	proper state and acquires a lock and pin on *bufP.  If the twostep
 *	succeeded, we return true; otherwise, we return false.
 */

bool
_nobt_twostep(scan, bufP, dir)
    IndexScanDesc scan;
    Buffer *bufP;
    ScanDirection dir;
{
    Page page;
    NOBTPageOpaque opaque;
    OffsetIndex offind, maxoff;
    OffsetIndex start;
    ItemPointer current;
    ItemId itemid;
    int itemsz;
    NOBTLItem btitem;
    NOBTLItem svitem;
    BlockNumber blkno;
    int nbytes;

    blkno = BufferGetBlockNumber(*bufP);
    page = BufferGetPage(*bufP, 0);
    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
    maxoff = PageGetMaxOffsetIndex(page);
    current = &(scan->currentItemData);
    offind = ItemPointerGetOffsetNumber(current, 0) - 1;

    if (opaque->nobtpo_next != P_NONE)
	start = 1;
    else
	start = 0;

    /* if we're safe, just do it */
    if (ScanDirectionIsForward(dir) && offind < maxoff) {
	ItemPointerSet(current, 0, blkno, 0, offind + 2);
	return (true);
    } else if (ScanDirectionIsBackward(dir) && offind < start) {
	ItemPointerSet(current, 0, blkno, 0, offind);
	return (true);
    }

    /* if we've hit end of scan we don't have to do any work */
    if (ScanDirectionIsForward(dir) && opaque->nobtpo_next == P_NONE)
	return (false);
    else if (ScanDirectionIsBackward(dir) && opaque->nobtpo_prev == P_NONE)
	return (false);

    /*
     *  Okay, it's off the page; let _nobt_step() do the hard work, and we'll
     *  try to remember where we were.  This is not guaranteed to work; this
     *  is the only place in the code where concurrency can screw us up,
     *  and it's because we want to be able to move in two directions in
     *  the scan.
     */

    itemid = PageGetItemId(page, offind);
    itemsz = ItemIdGetLength(itemid);
    btitem = (NOBTLItem) PageGetItem(page, itemid);
    svitem = (NOBTLItem) palloc(itemsz);
    bcopy((char *) btitem, (char *) svitem, itemsz);

    if (_nobt_step(scan, bufP, dir)) {
	pfree(svitem);
	return (true);
    }

    /* try to find our place again */
    *bufP = _nobt_getbuf(scan->relation, blkno, NOBT_READ);
    page = BufferGetPage(*bufP, 0);
    maxoff = PageGetMaxOffsetIndex(page);
    nbytes = sizeof(NOBTLItemData) - sizeof(IndexTupleData);

    while (offind <= maxoff) {
	itemid = PageGetItemId(page, offind);
	btitem = (NOBTLItem) PageGetItem(page, itemid);
	if (bcmp((char *) btitem, (char *) svitem, nbytes) == 0) {
	    pfree (svitem);
	    ItemPointerSet(current, 0, blkno, 0, offind + 1);
	    return (false);
	}
    }

    /*
     *  XXX crash and burn -- can't find our place.  We can be a little
     *  smarter -- walk to the next page to the right, for example, since
     *  that's the only direction that splits happen in.  Deletions screw
     *  us up less often since they're only done by the vacuum daemon.
     */

    elog(WARN, "btree synchronization error:  concurrent update botched scan");

    /* NOTREACHED */
}

/*
 *  _nobt_endpoint() -- Find the first or last key in the index.
 */

RetrieveIndexResult
_nobt_endpoint(scan, dir)
    IndexScanDesc scan;
    ScanDirection dir;
{
    Relation rel;
    Buffer buf;
    Page page;
    NOBTPageOpaque opaque;
    ItemPointer current;
    ItemPointer iptr;
    OffsetIndex offind, maxoff;
    OffsetIndex start;
    BlockNumber blkno;
    NOBTLItem btitem;
    NOBTIItem btiitem;
    IndexTuple itup;
    NOBTScanOpaque so;
    RetrieveIndexResult res;

    rel = scan->relation;
    current = &(scan->currentItemData);

    buf = _nobt_getroot(rel, NOBT_READ);
    blkno = BufferGetBlockNumber(buf);
    page = BufferGetPage(buf, 0);
    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);

    for (;;) {
	if (opaque->nobtpo_flags & NOBTP_LEAF)
	    break;

	if (ScanDirectionIsForward(dir)) {
	    if (opaque->nobtpo_next == P_NONE)
		offind = 0;
	    else
		offind = 1;
	} else
	    offind = PageGetMaxOffsetIndex(page);

	btiitem = (NOBTIItem) PageGetItem(page, PageGetItemId(page, offind));
	blkno = btiitem->nobtii_child;

	_nobt_relbuf(rel, buf, NOBT_READ);
	buf = _nobt_getbuf(rel, blkno, NOBT_READ);
	page = BufferGetPage(buf, 0);
	opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);

	/*
	 *  Race condition:  If the child page we just stepped onto is in
	 *  the process of being split, we need to make sure we're all the
	 *  way at the edge of the tree.  See the paper by Lehman and Yao.
	 */

	if (ScanDirectionIsBackward(dir) && opaque->nobtpo_next != P_NONE) {
	    do {
		blkno = opaque->nobtpo_next;
		_nobt_relbuf(rel, buf, NOBT_READ);
		buf = _nobt_getbuf(rel, blkno, NOBT_READ);
		page = BufferGetPage(buf, 0);
		opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
	    } while (opaque->nobtpo_next != P_NONE);
	}
    }

    /* okay, we've got the {left,right}-most page in the tree */
    maxoff = PageGetMaxOffsetIndex(page);

    if (ScanDirectionIsForward(dir)) {
	maxoff = PageGetMaxOffsetIndex(page);
	if (opaque->nobtpo_next == P_NONE)
	    start = 0;
	else
	    start = 1;

	if (PageIsEmpty(page) || start > maxoff) {
	    ItemPointerSet(current, 0, blkno, 0, maxoff + 1);
	    if (!_nobt_step(scan, &buf, BackwardScanDirection))
		return ((RetrieveIndexResult) NULL);

	    start = ItemPointerGetOffsetNumber(current, 0);
	    page = BufferGetPage(buf);
	} else {
	    ItemPointerSet(current, 0, blkno, 0, start + 1);
	}
    } else if (ScanDirectionIsBackward(dir)) {
	start = PageGetMaxOffsetIndex(page);
	if (PageIsEmpty(page)) {
	    ItemPointerSet(current, 0, blkno, 0, start + 1);
	    if (!_nobt_step(scan, &buf, ForwardScanDirection))
		return ((RetrieveIndexResult) NULL);

	    start = ItemPointerGetOffsetNumber(current, 0);
	    page = BufferGetPage(buf);
	} else {
	    ItemPointerSet(current, 0, blkno, 0, start + 1);
	}
    } else {
	elog(WARN, "Illegal scan direction %d", dir);
    }

    btitem = (NOBTLItem) PageGetItem(page, PageGetItemId(page, start));
    itup = &(btitem->nobtli_itup);

    /* see if we picked a winner */
    if (_nobt_checkqual(scan, itup)) {
	iptr = (ItemPointer) palloc(sizeof(ItemPointerData));
	bcopy((char *) &(itup->t_tid), (char *) iptr, sizeof(ItemPointerData));
	res = ItemPointerFormRetrieveIndexResult(current, iptr);

	/* remember which buffer we have pinned */
	so = (NOBTScanOpaque) scan->opaque;
	so->nobtso_curbuf = buf;
    } else {
	_nobt_relbuf(rel, buf, NOBT_READ);
	res = (RetrieveIndexResult) NULL;
    }

    return (res);
}

#endif /* NOBTREE */
@


1.5
log
@all three implementations work correctly, need to collect timings
@
text
@d6 2
d26 1
a26 1
RcsId("$Header: /users/mao/nobtree/RCS/nobtsearch.c,v 1.4 1991/06/12 03:56:58 mao Exp mao $");
d1251 2
@


1.4
log
@#defines for normal, shadow implemenentation; both impls work
@
text
@d24 1
a24 1
RcsId("$Header: /users/mao/nobtree/RCS/nobtsearch.c,v 1.3 1991/06/11 07:41:44 mao Exp mao $");
d104 1
a104 1
#ifdef	SHADOW
d125 1
a125 1
#endif	/* SHADOW */
a198 1
#ifdef	SHADOW
d219 1
d228 1
d239 1
d299 1
a299 1
#endif	/* SHADOW */
@


1.3
log
@works with minimal per-tuple, per-page overhead
@
text
@d24 1
a24 1
RcsId("$Header: /users/mao/nobtree/RCS/nobtsearch.c,v 1.2 1991/06/10 21:05:53 mao Exp mao $");
d104 1
d125 1
d199 1
d297 1
@


1.2
log
@structures are too big, but the code works
@
text
@d21 1
d24 1
a24 1
RcsId("$Header: RCS/nobtsearch.c,v 1.1 91/05/28 14:34:15 mao Exp Locker: mao $");
d73 2
a74 2
    NOBTItem btitem;
    NOBTItem item_save;
a75 2
    IndexTuple itup;
    IndexTuple itup_save;
d92 2
a93 16
    btitem = (NOBTItem) PageGetItem(page, itemid);
    itup = &(btitem->nobti_itup);
    blkno = ItemPointerGetBlockNumber(&(itup->t_tid));

    /*
     *  We need to save the bit image of the index entry we chose in the
     *  parent page on a stack.  In case we split the tree, we'll use this
     *  bit image to figure out what our real parent page is, in case the
     *  parent splits while we're working lower in the tree.  See the paper
     *  by Lehman and Yao for how this is detected and handled.
     *
     *  Also, in the no-overwrite implementation, we use the keys that
     *  bound this pointer as the link token, to verify that we've traversed
     *  an active link in the tree.  For the no-overwrite case, we also need
     *  the next pointer.
     */
d95 1
d97 1
a97 1
    item_save = (NOBTItem) palloc(item_nbytes);
d107 1
a107 1
	    stack->nobts_nxtitem = (NOBTItem) NULL;
d111 2
a112 2
	    item_save = (NOBTItem) palloc(item_nbytes);
	    btitem = (NOBTItem) PageGetItem(page, itemid);
d119 2
a120 2
	item_save = (NOBTItem) palloc(item_nbytes);
	btitem = (NOBTItem) PageGetItem(page, itemid);
d180 2
a181 1
    NOBTItem chkitem;
d186 1
d192 5
d242 12
a253 8
	    stktupa = &(stack->nobts_btitem->nobti_itup);
	    stkstra = ((char *) stktupa) + sizeof(IndexTupleData);
	    chkitem = (NOBTItem) PageGetItem(page, PageGetItemId(page, 0));
	    chktupa = &(chkitem->nobti_itup);
	    chkstra = ((char *) chktupa) + sizeof(IndexTupleData);
	    cmpsiza = IndexTupleSize(stktupa) - sizeof(IndexTupleData);
	    if (IndexTupleSize(stktupa) != IndexTupleSize(chktupa)
		|| bcmp(stkstra, chkstra, cmpsiza) != 0)
d257 1
a257 1
	if (stack == NULL || stack->nobts_nxtitem == (NOBTItem) NULL) {
d264 11
a274 6
	    stktupa = &(stack->nobts_btitem->nobti_itup);
	    stkstra = ((char *) stktupa) + sizeof(IndexTupleData);
	    chkitem = (NOBTItem) PageGetItem(page, PageGetItemId(page, 1));
	    chktupa = &(chkitem->nobti_itup);
	    chkstra = ((char *) chktupa) + sizeof(IndexTupleData);
	    cmpsiza = IndexTupleSize(stktupa) - sizeof(IndexTupleData);
d277 13
a289 10
	    stktupb = &(stack->nobts_nxtitem->nobti_itup);
	    stkstrb = ((char *) stktupb) + sizeof(IndexTupleData);
	    chkitem = (NOBTItem) PageGetItem(page, PageGetItemId(page, 0));
	    chktupb = &(chkitem->nobti_itup);
	    chkstrb = ((char *) chktupb) + sizeof(IndexTupleData);
	    cmpsizb = IndexTupleSize(stktupb) - sizeof(IndexTupleData);

	    if (IndexTupleSize(stktupa) != IndexTupleSize(chktupa)
		|| bcmp(stkstra, chkstra, cmpsiza) != 0
	        || IndexTupleSize(stktupb) != IndexTupleSize(chktupb)
d326 2
a327 1
    NOBTItem item;
d334 1
d337 1
d339 1
a339 2
    item = (NOBTItem) PageGetItem(page, itemid);
    indexTuple = &(item->nobti_itup);
d342 12
d355 7
a361 2
    /* see if the comparison is true for all of the key attributes */
    for (i=1; i <= keysz; i++) {
d363 1
a363 4
	entry = &scankey->data[i-1];
	attrDatum = IndexTupleGetAttributeValue(indexTuple,
						entry->attributeNumber,
						tupDes, &isNull);
d365 1
d367 1
a367 1
	compare = _nobt_invokestrat(rel, i, strat, keyDatum, attrDatum);
a370 1

d550 2
a551 1
    NOBTItem btitem;
d559 1
d561 1
d570 7
a576 3
    if (!(opaque->nobtpo_flags & NOBTP_LEAF)
	&& opaque->nobtpo_prev == P_NONE
	&& offind == 0) {
a591 3
    btitem = (NOBTItem) PageGetItem(page, PageGetItemId(page, offind));
    itup = &(btitem->nobti_itup);

d602 5
a606 2
    for (i = 1; i <= keysz; i++) {
	entry = &(scankey->data[i - 1]);
d610 4
a613 5
	result = (int) (*(entry->func))(entry->argument, datum);

	/* if the keys are unequal, return the difference */
	if (result != 0)
	    return (result);
d616 2
a617 2
    /* by here, the keys are equal */
    return (0);
d644 1
a644 1
    NOBTItem btitem;
a651 6
    /*
     *  XXX 10 may 91:  somewhere there's a bug in our management of the
     *  cached buffer for this scan.  wei discovered it.  the following
     *  is a workaround so he can work until i figure out what's going on.
     */

d667 2
a668 2
    btitem = (NOBTItem) PageGetItem(page, PageGetItemId(page, offind));
    itup = &btitem->nobti_itup;
d709 1
a709 1
    NOBTItem btitem;
d838 2
a839 2
    btitem = (NOBTItem) PageGetItem(page, PageGetItemId(page, offind));
    itup = &btitem->nobti_itup;
d1040 2
a1041 2
    NOBTItem btitem;
    NOBTItem svitem;
d1082 2
a1083 2
    btitem = (NOBTItem) PageGetItem(page, itemid);
    svitem = (NOBTItem) palloc(itemsz);
d1095 1
a1095 1
    nbytes = sizeof(NOBTItemData) - sizeof(IndexTupleData);
d1099 1
a1099 1
	btitem = (NOBTItem) PageGetItem(page, itemid);
d1137 2
a1138 1
    NOBTItem btitem;
d1163 2
a1164 4
	btitem = (NOBTItem) PageGetItem(page, PageGetItemId(page, offind));
	itup = &(btitem->nobti_itup);

	blkno = ItemPointerGetBlockNumber(&(itup->t_tid));
d1224 2
a1225 2
    btitem = (NOBTItem) PageGetItem(page, PageGetItemId(page, start));
    itup = &(btitem->nobti_itup);
@


1.1
log
@Initial revision
@
text
@d21 1
a21 1
#include "access/nbtree.h"
d23 1
a23 4
int16	*MaoBlock1;
Buffer	*MaoBuffer1;
int16	*MaoBlock2;
Buffer	*MaoBuffer2;
a24 2
RcsId("$Header: RCS/nbtsearch.c,v 1.20 91/05/10 13:59:17 mao Exp $");

d68 1
a77 1
    /* if this is a leaf page, we're done */
d80 2
d102 6
a107 3
     *  by Lehman and Yao for how this is detected and handled.  (We use
     *  sequence numbers to disambiguate duplicate keys in the index --
     *  Lehman and Yao disallow duplicate keys).
d113 2
a114 1
    stack = (NOBTStack) palloc(sizeof(NONOBTStackData));
a115 1
    stack->nobts_offset = offind;
d117 22
d151 1
a151 1
    *bufP = _nobt_moveright(rel, *bufP, keysz, scankey, NOBT_READ);
d177 1
a177 1
_nobt_moveright(rel, buf, keysz, scankey, access)
d183 1
d187 1
d192 7
d202 1
d204 19
a222 3
    /* if we're on a rightmost page, we don't need to move right */
    if (opaque->nobtpo_next == P_NONE)
	return (buf);
d224 8
a231 2
    /* by convention, item 0 on non-rightmost pages is the high key */
    hikey = PageGetItemId(page, 0);
d234 6
a239 3
     *  If the scan key that brought us to this page is >= the high key
     *  stored on the page, then the page has split and we need to move
     *  right.
d242 47
a288 2
    if (_nobt_skeycmp(rel, keysz, scankey, page, hikey,
		    NOBTGreaterEqualStrategyNumber)) {
d290 1
a290 15
	/* move right as long as we need to */
	do {
	    /*
	     *  If this page consists of all duplicate keys (hikey and first
	     *  key on the page have the same value), then we don't need to
	     *  step right.
	     */
	    if (PageGetMaxOffsetIndex(page) > 0) {
		itemid = PageGetItemId(page, 1);
		if (_nobt_skeycmp(rel, keysz, scankey, page, itemid,
				NOBTEqualStrategyNumber)) {
		    /* break is for the "move right" while loop */
		    break;
		}
	    }
d292 3
a294 11
	    /* step right one page */
	    rblkno = opaque->nobtpo_next;
	    _nobt_relbuf(rel, buf, access);
	    buf = _nobt_getbuf(rel, rblkno, access);
	    page = BufferGetPage(buf, 0);
	    opaque = (NOBTPageOpaque) PageGetSpecialPointer(page);
	    hikey = PageGetItemId(page, 0);

	} while (opaque->nobtpo_next != P_NONE
		 && _nobt_skeycmp(rel, keysz, scankey, page, hikey,
				 NOBTGreaterEqualStrategyNumber));
d296 1
d1074 1
a1074 1
    nbytes = sizeof(NONOBTItemData) - sizeof(IndexTupleData);
@
