/*-------------------------------------------------------------------------
 *
 * hashovfl.c--
 *    Overflow page management code for the Postgres hash access method
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *    $Header: /usr/local/devel/pglite/cvs/src/backend/access/hash/hashovfl.c,v 1.8 1996/02/24 00:00:53 jolly Exp $
 *
 * NOTES
 *    Overflow pages look like ordinary relation pages.
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "storage/bufmgr.h"
#include "storage/bufpage.h"

#include "utils/elog.h"
#include "utils/rel.h"
#include "utils/excid.h"

#include "access/genam.h"
#include "access/hash.h"

static OverflowPageAddress _hash_getovfladdr(Relation rel, Buffer *metabufp);
static uint32 _hash_firstfreebit(uint32 map);

/*
 *  _hash_addovflpage
 *
 *  Add an overflow page to the page currently pointed to by the buffer 
 *  argument 'buf'. 
 *
 *  *Metabufp has a read lock upon entering the function; buf has a 
 *  write lock. 
 *  
 */
Buffer
_hash_addovflpage(Relation rel, Buffer *metabufp, Buffer buf)
{
    
    OverflowPageAddress oaddr;
    BlockNumber ovflblkno;
    Buffer ovflbuf;
    HashMetaPage metap;
    HashPageOpaque ovflopaque;
    HashPageOpaque pageopaque;
    Page page;
    Page ovflpage;
    
    /* this had better be the last page in a bucket chain */
    page = BufferGetPage(buf);
    _hash_checkpage(page, LH_BUCKET_PAGE|LH_OVERFLOW_PAGE);
    pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
    Assert(!BlockNumberIsValid(pageopaque->hasho_nextblkno));
    
    metap = (HashMetaPage) BufferGetPage(*metabufp);
    _hash_checkpage((Page) metap, LH_META_PAGE);

    /* allocate an empty overflow page */
    oaddr = _hash_getovfladdr(rel, metabufp);
    if (oaddr == InvalidOvflAddress) {
	elog(WARN, "_hash_addovflpage: problem with _hash_getovfladdr.");
    }
    ovflblkno = OADDR_TO_BLKNO(OADDR_OF(SPLITNUM(oaddr), OPAGENUM(oaddr)));
    Assert(BlockNumberIsValid(ovflblkno));
    ovflbuf = _hash_getbuf(rel, ovflblkno, HASH_WRITE);
    Assert(BufferIsValid(ovflbuf));
    ovflpage = BufferGetPage(ovflbuf);

    /* initialize the new overflow page */
    _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
    ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
    ovflopaque->hasho_prevblkno = BufferGetBlockNumber(buf);
    ovflopaque->hasho_nextblkno = InvalidBlockNumber;
    ovflopaque->hasho_flag = LH_OVERFLOW_PAGE;
    ovflopaque->hasho_oaddr = oaddr;
    ovflopaque->hasho_bucket = pageopaque->hasho_bucket;
    _hash_wrtnorelbuf(rel, ovflbuf);
    
    /* logically chain overflow page to previous page */
    pageopaque->hasho_nextblkno = ovflblkno;
    _hash_wrtnorelbuf(rel, buf);
    return (ovflbuf);
}

/*
 *  _hash_getovfladdr()
 *
 *  Find an available overflow page and return its address. 
 *
 *  When we enter this function, we have a read lock on *metabufp which
 *  we change to a write lock immediately. Before exiting, the write lock
 *  is exchanged for a read lock. 
 *
 */
static OverflowPageAddress
_hash_getovfladdr(Relation rel, Buffer *metabufp)
{
    HashMetaPage metap;
    Buffer mapbuf;
    BlockNumber blkno;
    PageOffset offset;
    OverflowPageAddress oaddr;
    SplitNumber splitnum;
    uint32 *freep;
    uint32 max_free; 
    uint32 bit;
    uint32 first_page; 
    uint32 free_bit; 
    uint32 free_page; 
    uint32 in_use_bits;
    uint32 i, j;
    
    metap = (HashMetaPage) _hash_chgbufaccess(rel, metabufp, HASH_READ, HASH_WRITE);
    
    splitnum = metap->OVFL_POINT;
    max_free = metap->SPARES[splitnum];
    
    free_page = (max_free - 1) >> (metap->BSHIFT + BYTE_TO_BIT);
    free_bit = (max_free - 1) & (BMPGSZ_BIT(metap) - 1);
    
    /* Look through all the free maps to find the first free block */
    first_page = metap->LAST_FREED >> (metap->BSHIFT + BYTE_TO_BIT);
    for ( i = first_page; i <= free_page; i++ ) {
	Page mappage;

	blkno = metap->hashm_mapp[i];
	mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE);
	mappage = BufferGetPage(mapbuf);
	_hash_checkpage(mappage, LH_BITMAP_PAGE);
	freep = HashPageGetBitmap(mappage);
	Assert(freep);
	
	if (i == free_page)
	    in_use_bits = free_bit;
	else
	    in_use_bits = BMPGSZ_BIT(metap) - 1;
	
	if (i == first_page) {
	    bit = metap->LAST_FREED & (BMPGSZ_BIT(metap) - 1);
	    j = bit / BITS_PER_MAP;
	    bit = bit & ~(BITS_PER_MAP - 1);
	} else {
	    bit = 0;
	    j = 0;
	}
	for (; bit <= in_use_bits; j++, bit += BITS_PER_MAP)
	    if (freep[j] != ALL_SET)
		goto found;
    }
    
    /* No Free Page Found - have to allocate a new page */
    metap->LAST_FREED = metap->SPARES[splitnum];
    metap->SPARES[splitnum]++;
    offset = metap->SPARES[splitnum] -
	(splitnum ? metap->SPARES[splitnum - 1] : 0);
    
#define	OVMSG	"HASH: Out of overflow pages.  Out of luck.\n"
    
    if (offset > SPLITMASK) {
	if (++splitnum >= NCACHED) {
	    elog(WARN, OVMSG);
	}
	metap->OVFL_POINT = splitnum;
	metap->SPARES[splitnum] = metap->SPARES[splitnum-1];
	metap->SPARES[splitnum-1]--;
	offset = 0;
    }
    
    /* Check if we need to allocate a new bitmap page */
    if (free_bit == BMPGSZ_BIT(metap) - 1) {
	/* won't be needing old map page */

	_hash_relbuf(rel, mapbuf, HASH_WRITE);

	free_page++;
	if (free_page >= NCACHED) {
	    elog(WARN, OVMSG);
	}
	
	/*
	 * This is tricky.  The 1 indicates that you want the new page
	 * allocated with 1 clear bit.  Actually, you are going to
	 * allocate 2 pages from this map.  The first is going to be
	 * the map page, the second is the overflow page we were
	 * looking for.  The init_bitmap routine automatically, sets
	 * the first bit of itself to indicate that the bitmap itself
	 * is in use.  We would explicitly set the second bit, but
	 * don't have to if we tell init_bitmap not to leave it clear
	 * in the first place.
	 */
	if (_hash_initbitmap(rel, metap, OADDR_OF(splitnum, offset),
			     1, free_page)) {
	    elog(WARN, "overflow_page: problem with _hash_initbitmap.");
	}
	metap->SPARES[splitnum]++;
	offset++;
	if (offset > SPLITMASK) {
	    if (++splitnum >= NCACHED) {
		elog(WARN, OVMSG);
	    }
	    metap->OVFL_POINT = splitnum;
	    metap->SPARES[splitnum] = metap->SPARES[splitnum-1];
	    metap->SPARES[splitnum-1]--;
	    offset = 0;
	}
    } else {
	
	/*
	 * Free_bit addresses the last used bit.  Bump it to address
	 * the first available bit.
	 */
	free_bit++;
	SETBIT(freep, free_bit);
	_hash_wrtbuf(rel, mapbuf);
    }
    
    /* Calculate address of the new overflow page */
    oaddr = OADDR_OF(splitnum, offset);
    _hash_chgbufaccess(rel, metabufp, HASH_WRITE, HASH_READ);
    return (oaddr);
    
 found:
    bit = bit + _hash_firstfreebit(freep[j]);
    SETBIT(freep, bit);
    _hash_wrtbuf(rel, mapbuf);
    
    /*
     * Bits are addressed starting with 0, but overflow pages are addressed
     * beginning at 1. Bit is a bit addressnumber, so we need to increment
     * it to convert it to a page number.
     */
    
    bit = 1 + bit + (i * BMPGSZ_BIT(metap));
    if (bit >= metap->LAST_FREED) {
	metap->LAST_FREED = bit - 1;
    }
    
    /* Calculate the split number for this page */
    for (i = 0; (i < splitnum) && (bit > metap->SPARES[i]); i++)
	;
    offset = (i ? bit - metap->SPARES[i - 1] : bit);
    if (offset >= SPLITMASK) {
	elog(WARN, OVMSG);
    }
    
    /* initialize this page */
    oaddr = OADDR_OF(i, offset);
    _hash_chgbufaccess(rel, metabufp, HASH_WRITE, HASH_READ);
    return (oaddr);
}

/*
 *  _hash_firstfreebit()
 *
 *  Return the first bit that is not set in the argument 'map'. This
 *  function is used to find an available overflow page within a
 *  splitnumber. 
 * 
 */
static uint32
_hash_firstfreebit(uint32 map)
{
    uint32 i, mask;
    
    mask = 0x1;
    for (i = 0; i < BITS_PER_MAP; i++) {
	if (!(mask & map))
	    return (i);
	mask = mask << 1;
    }
    return (i);
}

/*
 *  _hash_freeovflpage() - 
 *
 *  Mark this overflow page as free and return a buffer with 
 *  the page that follows it (which may be defined as
 *  InvalidBuffer). 
 *
 */
Buffer
_hash_freeovflpage(Relation rel, Buffer ovflbuf)
{
    HashMetaPage metap;
    Buffer metabuf;
    Buffer mapbuf;
    BlockNumber prevblkno;
    BlockNumber blkno;
    BlockNumber nextblkno;
    HashPageOpaque ovflopaque;
    Page ovflpage;
    Page mappage;
    OverflowPageAddress addr;
    SplitNumber splitnum;
    uint32 *freep;
    uint32 ovflpgno;
    int32 bitmappage, bitmapbit;
    Bucket bucket;
    
    metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
    metap = (HashMetaPage) BufferGetPage(metabuf);
    _hash_checkpage((Page) metap, LH_META_PAGE);
    
    ovflpage = BufferGetPage(ovflbuf);
    _hash_checkpage(ovflpage, LH_OVERFLOW_PAGE);
    ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
    addr = ovflopaque->hasho_oaddr;
    nextblkno = ovflopaque->hasho_nextblkno;
    prevblkno = ovflopaque->hasho_prevblkno;
    bucket = ovflopaque->hasho_bucket;
    (void) memset(ovflpage, 0, BufferGetPageSize(ovflbuf));
    _hash_wrtbuf(rel, ovflbuf);
    
    /* 
     * fix up the bucket chain.  this is a doubly-linked list, so we
     * must fix up the bucket chain members behind and ahead of the
     * overflow page being deleted.
     *
     * XXX this should look like:
     * - lock prev/next
     * - modify/write prev/next (how to do write ordering with a
     * doubly-linked list???)
     * - unlock prev/next
     */
    if (BlockNumberIsValid(prevblkno)) {
	Buffer prevbuf = _hash_getbuf(rel, prevblkno, HASH_WRITE);
	Page prevpage = BufferGetPage(prevbuf);
	HashPageOpaque prevopaque =
	    (HashPageOpaque) PageGetSpecialPointer(prevpage);

	_hash_checkpage(prevpage, LH_BUCKET_PAGE|LH_OVERFLOW_PAGE);
	Assert(prevopaque->hasho_bucket == bucket);
	prevopaque->hasho_nextblkno = nextblkno;
	_hash_wrtbuf(rel, prevbuf);
    }
    if (BlockNumberIsValid(nextblkno)) {
	Buffer nextbuf = _hash_getbuf(rel, nextblkno, HASH_WRITE);
	Page nextpage = BufferGetPage(nextbuf);
	HashPageOpaque nextopaque =
	    (HashPageOpaque) PageGetSpecialPointer(nextpage);
	
	_hash_checkpage(nextpage, LH_OVERFLOW_PAGE);
	Assert(nextopaque->hasho_bucket == bucket);
	nextopaque->hasho_prevblkno = prevblkno;
	_hash_wrtbuf(rel, nextbuf);
    }
    
    /* 
     * Fix up the overflow page bitmap that tracks this particular
     * overflow page. The bitmap can be found in the MetaPageData
     * array element hashm_mapp[bitmappage].
     */
    splitnum = (addr >> SPLITSHIFT);
    ovflpgno =
	(splitnum ? metap->SPARES[splitnum - 1] : 0) + (addr & SPLITMASK) - 1;
    
    if (ovflpgno < metap->LAST_FREED) {
	metap->LAST_FREED = ovflpgno;
    }
    
    bitmappage = (ovflpgno >> (metap->BSHIFT + BYTE_TO_BIT));
    bitmapbit = ovflpgno & (BMPGSZ_BIT(metap) - 1);
    
    blkno = metap->hashm_mapp[bitmappage];
    mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE);
    mappage = BufferGetPage(mapbuf);
    _hash_checkpage(mappage, LH_BITMAP_PAGE);
    freep = HashPageGetBitmap(mappage);
    CLRBIT(freep, bitmapbit);
    _hash_wrtbuf(rel, mapbuf);
    
    _hash_relbuf(rel, metabuf, HASH_WRITE);
    
    /* 
     * now instantiate the page that replaced this one, 
     * if it exists, and return that buffer with a write lock.
     */
    if (BlockNumberIsValid(nextblkno)) {
	return (_hash_getbuf(rel, nextblkno, HASH_WRITE));
    } else {
	return (InvalidBuffer);
    }
}


/*
 *  _hash_initbitmap()
 *  
 *   Initialize a new bitmap page.  The metapage has a write-lock upon
 *   entering the function.
 *
 * 'pnum' is the OverflowPageAddress of the new bitmap page.
 * 'nbits' is how many bits to clear (i.e., make available) in the new
 * bitmap page.  the remainder of the bits (as well as the first bit,
 * representing the bitmap page itself) will be set.
 * 'ndx' is the 0-based offset of the new bitmap page within the
 * metapage's array of bitmap page OverflowPageAddresses.
 */

#define INT_MASK	((1 << INT_TO_BIT) -1)

int32
_hash_initbitmap(Relation rel,
		 HashMetaPage metap,
		 int32 pnum,
		 int32 nbits,
		 int32 ndx)
{
    Buffer buf;
    BlockNumber blkno;
    Page pg;
    HashPageOpaque op;
    uint32 *freep;
    int clearbytes, clearints;
    
    blkno = OADDR_TO_BLKNO(pnum);
    buf = _hash_getbuf(rel, blkno, HASH_WRITE);
    pg = BufferGetPage(buf);
    _hash_pageinit(pg, BufferGetPageSize(buf));
    op = (HashPageOpaque) PageGetSpecialPointer(pg);
    op->hasho_oaddr = InvalidOvflAddress;
    op->hasho_prevblkno = InvalidBlockNumber;
    op->hasho_nextblkno = InvalidBlockNumber;
    op->hasho_flag = LH_BITMAP_PAGE;
    op->hasho_bucket = -1;

    freep = HashPageGetBitmap(pg);

    /* set all of the bits above 'nbits' to 1 */
    clearints = ((nbits - 1) >> INT_TO_BIT) + 1;
    clearbytes = clearints << INT_TO_BYTE;
    (void) memset((char *) freep, 0, clearbytes);
    (void) memset(((char *) freep) + clearbytes, 0xFF,
		  BMPGSZ_BYTE(metap) - clearbytes);
    freep[clearints - 1] = ALL_SET << (nbits & INT_MASK);

    /* bit 0 represents the new bitmap page */
    SETBIT(freep, 0);
        
    /* metapage already has a write lock */
    metap->hashm_nmaps++;
    metap->hashm_mapp[ndx] = blkno;
    
    /* write out the new bitmap page (releasing its locks) */
    _hash_wrtbuf(rel, buf);

    return (0);
}


/*
 *  _hash_squeezebucket(rel, bucket)
 *
 *  Try to squeeze the tuples onto pages occuring earlier in the
 *  bucket chain in an attempt to free overflow pages. When we start
 *  the "squeezing", the page from which we start taking tuples (the
 *  "read" page) is the last bucket in the bucket chain and the page
 *  onto which we start squeezing tuples (the "write" page) is the
 *  first page in the bucket chain.  The read page works backward and
 *  the write page works forward; the procedure terminates when the
 *  read page and write page are the same page.
 */
void
_hash_squeezebucket(Relation rel,
		    HashMetaPage metap, 
		    Bucket bucket)
{
    Buffer wbuf;
    Buffer rbuf;
    BlockNumber wblkno;		
    BlockNumber rblkno;		
    Page wpage;
    Page rpage;
    HashPageOpaque wopaque;
    HashPageOpaque ropaque;
    OffsetNumber woffnum;
    OffsetNumber roffnum;
    HashItem hitem;
    int itemsz;
    
/*    elog(DEBUG, "_hash_squeezebucket: squeezing bucket %d", bucket); */

    /*
     * start squeezing into the base bucket page.
     */
    wblkno = BUCKET_TO_BLKNO(bucket);
    wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE);
    wpage = BufferGetPage(wbuf);
    _hash_checkpage(wpage, LH_BUCKET_PAGE);
    wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
    
    /*
     * if there aren't any overflow pages, there's nothing to squeeze.
     */
    if (!BlockNumberIsValid(wopaque->hasho_nextblkno)) {
	_hash_relbuf(rel, wbuf, HASH_WRITE);
	return;
    }
    
    /*
     * find the last page in the bucket chain by starting at the base
     * bucket page and working forward.
     *
     * XXX if chains tend to be long, we should probably move forward
     * using HASH_READ and then _hash_chgbufaccess to HASH_WRITE when
     * we reach the end.  if they are short we probably don't care
     * very much.  if the hash function is working at all, they had
     * better be short..
     */
    ropaque = wopaque;
    do {
	rblkno = ropaque->hasho_nextblkno;
	if (ropaque != wopaque) {
	    _hash_relbuf(rel, rbuf, HASH_WRITE);
	}
	rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE);
	rpage = BufferGetPage(rbuf);
	_hash_checkpage(rpage, LH_OVERFLOW_PAGE);
	Assert(!PageIsEmpty(rpage));
	ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
	Assert(ropaque->hasho_bucket == bucket);
    } while (BlockNumberIsValid(ropaque->hasho_nextblkno));

    /*
     * squeeze the tuples.
     */
    roffnum = FirstOffsetNumber;
    for(;;) {
	hitem = (HashItem) PageGetItem(rpage, PageGetItemId(rpage, roffnum));
	itemsz = IndexTupleDSize(hitem->hash_itup) 
	    + (sizeof(HashItemData) - sizeof(IndexTupleData));
	itemsz = DOUBLEALIGN(itemsz);
	
	/*
	 * walk up the bucket chain, looking for a page big enough for
	 * this item.
	 */
	while (PageGetFreeSpace(wpage) < itemsz) {
	    wblkno = wopaque->hasho_nextblkno;

	    _hash_wrtbuf(rel, wbuf);

	    if (!BlockNumberIsValid(wblkno) || (rblkno == wblkno)) {
		_hash_wrtbuf(rel, rbuf);
		/* wbuf is already released */
		return;
	    }
	    
	    wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE);
	    wpage = BufferGetPage(wbuf);
	    _hash_checkpage(wpage, LH_OVERFLOW_PAGE);
	    Assert(!PageIsEmpty(wpage));
	    wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
	    Assert(wopaque->hasho_bucket == bucket);
	}
	
	/* 
	 * if we're here, we have found room so insert on the "write"
	 * page.
	 */
	woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage));
	(void) PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED);
	
	/* 
	 * delete the tuple from the "read" page.
	 * PageIndexTupleDelete repacks the ItemId array, so 'roffnum'
	 * will be "advanced" to the "next" ItemId.
	 */
	PageIndexTupleDelete(rpage, roffnum);
	_hash_wrtnorelbuf(rel, rbuf);
	
	/*
	 * if the "read" page is now empty because of the deletion,
	 * free it.
	 */
	if (PageIsEmpty(rpage) && (ropaque->hasho_flag & LH_OVERFLOW_PAGE)) {
	    rblkno = ropaque->hasho_prevblkno;
	    Assert(BlockNumberIsValid(rblkno));

	    /*
	     * free this overflow page.  the extra _hash_relbuf is
	     * because _hash_freeovflpage gratuitously returns the
	     * next page (we want the previous page and will get it
	     * ourselves later).
	     */
	    rbuf = _hash_freeovflpage(rel, rbuf);
	    if (BufferIsValid(rbuf)) {
		_hash_relbuf(rel, rbuf, HASH_WRITE);
	    }
	    
	    if (rblkno == wblkno) {
		/* rbuf is already released */
		_hash_wrtbuf(rel, wbuf);
		return;
	    }
	    
	    rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE);
	    rpage = BufferGetPage(rbuf);
	    _hash_checkpage(rpage, LH_OVERFLOW_PAGE);
	    Assert(!PageIsEmpty(rpage));
	    ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
	    Assert(ropaque->hasho_bucket == bucket);

	    roffnum = FirstOffsetNumber;
	}
    }
}
