#include "tmp/postgres.h"
#include "tmp/c.h"

#include "storage/bufpage.h"
#include "utils/log.h"

#include "access/genam.h"
#include "access/heapam.h"
#include "access/tqual.h"

#include "catalog/pg_relation.h"

/*
 *  dustbuster() -- a portable postgres vacuum cleaner.
 *
 *	This routine doesn't do any updates on its own behalf, so it
 *	wouldn't really matter if it violated two-phase locking.  However,
 *	due to a bug I don't understand in the lock manager, we can't
 *	even do short-term write locks in the inner scan here -- the lock
 *	manager bitches about an attempt to release a write lock that we
 *	definitely have.
 *
 *	Since the only thing we update is stuff that other routines have
 *	to fill in anyway, this shouldn't matter very much.  Still, you
 *	might feel better running this single-user.
 *
 *	Takes no parameters; returns the number of tuples cleaned, in lieu
 *	of any more interesting result.
 */

int32
dustbuster()
{
    Relation pgrel;
    TupleDescriptor tupdesc;
    HeapScanDesc relscan;
    HeapScanDesc scan;
    Name rname;
    char rtype;
    Relation onerel;
    HeapTuple htup;
    HeapTuple reltup;
    Buffer buf, buf2;
    Page page;
    OffsetIndex offind, maxoff;
    ItemId itemid;
    int blkno, nblocks;
    int nchanges;
    Datum d;
    Boolean tupchng, changed;
    Boolean n;

    pgrel = heap_openr(Name_pg_relation);
    tupdesc = RelationGetTupleDescriptor(pgrel);
    relscan = heap_beginscan(pgrel, 0, NULL, 0, NULL);

    nchanges = 0;
    while (HeapTupleIsValid(reltup = heap_getnext(relscan, 0, &buf))) {
	if (!HeapTupleSatisfiesTimeQual(reltup, NowTimeQual))
	    continue;

	d = (Datum) heap_getattr(reltup, buf, Anum_pg_relation_relkind,
				 tupdesc, &n);
	rtype = (char) DatumGetChar(d);
	if (rtype != 'r')
	    continue;

	d = (Datum) heap_getattr(reltup, buf, Anum_pg_relation_relname,
				 tupdesc, &n);
	rname = (Name) DatumGetName(d);
	onerel = heap_openr(rname);
	nblocks = RelationGetNumberOfBlocks(onerel);

	for (blkno = 0; blkno < nblocks; blkno++) {

	    changed = false;
	    buf2 = ReadBuffer(onerel, blkno);
	    page = BufferGetPage(buf2, 0);
	    maxoff = PageGetMaxOffsetIndex(page);

	    for (offind = 0; offind <= maxoff; offind++) {

		itemid = PageGetItemId(page, offind);
		if (!ItemIdIsUsed(itemid))
		    continue;

		htup = (HeapTuple) PageGetItem(page, itemid);
		tupchng = false;

		if (htup->t_tmin == 0 && TransactionIdIsValid(htup->t_xmin)
		    && TransactionIdDidCommit(htup->t_xmin)) {
		    htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
		    tupchng = changed = true;
		}
		if (htup->t_tmax == 0 && TransactionIdIsValid(htup->t_xmax)
		    && TransactionIdDidCommit(htup->t_xmax)) {
		    htup->t_tmax = TransactionIdGetCommitTime(htup->t_xmax);
		    tupchng = changed = true;
		}
		if (tupchng)
		    nchanges++;
	    }
	    if (changed) {
		WriteBuffer(buf2);
	    } else
		ReleaseBuffer(buf2);
	}
	heap_close(onerel);
    }

    heap_endscan(relscan);
    heap_close(pgrel);

    return (nchanges);
}
