/*-------------------------------------------------------------------------
 *
 * sinvaladt.c--
 *    POSTGRES shared cache invalidation segment definitions.
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *    /usr/local/devel/pglite/cvs/src/backend/storage/ipc/sinvaladt.c,v 1.4 1995/06/02 20:34:42 jolly Exp
 *
 *-------------------------------------------------------------------------
 */
#include "storage/ipc.h"
#include "storage/sinvaladt.h"
#include "storage/lmgr.h"
#include "utils/elog.h"
#include "utils/palloc.h"

/* ----------------
 *	global variable notes
 *
 *	SharedInvalidationSemaphore
 *
 *	shmInvalBuffer
 *		the shared buffer segment, set by SISegmentAttach()
 *
 *	MyBackendId
 *		might be removed later, used only for
 * 		debugging in debug routines (end of file)
 *
 *	SIDbId
 *		identification of buffer (disappears)
 *
 *	SIRelId		\ 
 *	SIDummyOid	 \  identification of buffer
 *	SIXidData	 /
 *	SIXid		/
 *
 *  XXX This file really needs to be cleaned up.  We switched to using
 *	spinlocks to protect critical sections (as opposed to using fake
 *	relations and going through the lock manager) and some of the old
 *	cruft was 'ifdef'ed out, while other parts (now unused) are still
 *	compiled into the system. -mer 5/24/92
 * ----------------
 */
#ifdef HAS_TEST_AND_SET
int SharedInvalidationLockId;
#else
IpcSemaphoreId	SharedInvalidationSemaphore;
#endif

SISeg		*shmInvalBuffer;	
extern BackendId MyBackendId;

static void CleanupInvalidationState(int status, SISeg *segInOutP);
static BackendId SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag);
static int SIGetNumEntries(SISeg *segP);

/************************************************************************/
/* SISetActiveProcess(segP, backendId)	set the backend status active	*/
/*  	should be called only by the postmaster when creating a backend	*/
/************************************************************************/
/* XXX I suspect that the segP parameter is extraneous. -hirohama */
static void
SISetActiveProcess(SISeg *segInOutP, BackendId backendId)
{
    /* mark all messages as read */
    
    /* Assert(segP->procState[backendId - 1].tag == MyBackendTag); */
    
    segInOutP->procState[backendId - 1].resetState = false;
    segInOutP->procState[backendId - 1].limit = SIGetNumEntries(segInOutP);
}

/****************************************************************************/
/* SIBackendInit()  initializes a backend to operate on the buffer  	    */
/****************************************************************************/
int
SIBackendInit(SISeg *segInOutP)
{
    LRelId  	    	    LtCreateRelId();
    TransactionId           LMITransactionIdCopy();
    
    Assert(MyBackendTag > 0);
    
    MyBackendId = SIAssignBackendId(segInOutP, MyBackendTag);
    if (MyBackendId == InvalidBackendTag)
	return 0;
    
#ifdef	INVALIDDEBUG
    elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.",
	 MyBackendTag, MyBackendId);
#endif	/* INVALIDDEBUG */
    
    SISetActiveProcess(segInOutP, MyBackendId);
    on_exitpg(CleanupInvalidationState, (caddr_t)segInOutP);
    return 1;
}

/* ----------------
 *	SIAssignBackendId
 * ----------------
 */
static BackendId
SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag)
{
    Index		index;
    ProcState	*stateP;
    
    stateP = NULL;
    
    for (index = 0; index < MaxBackendId; index += 1) {
	if (segInOutP->procState[index].tag == InvalidBackendTag ||
	    segInOutP->procState[index].tag == backendTag)
	    {
		stateP = &segInOutP->procState[index];
		break;
	    }
	
	if (!PointerIsValid(stateP) ||
	    (segInOutP->procState[index].resetState &&
	     (!stateP->resetState ||
	      stateP->tag < backendTag)) ||
	    (!stateP->resetState &&
	     (segInOutP->procState[index].limit <
	      stateP->limit ||
	      stateP->tag < backendTag)))
	    {
		stateP = &segInOutP->procState[index];
	    }
    }
    
    /* verify that all "procState" entries checked for matching tags */
    
    for (index += 1; index < MaxBackendId; index += 1) {
	if (segInOutP->procState[index].tag == backendTag) {
	    elog (FATAL, "SIAssignBackendId: tag %d found twice",
		  backendTag);
	}
    }
    
    if (stateP->tag != InvalidBackendTag) {
	if (stateP->tag == backendTag) {
	    elog(NOTICE, "SIAssignBackendId: reusing tag %d",
		 backendTag);
	} else {
	    elog(NOTICE,
		 "SIAssignBackendId: discarding tag %d",
		 stateP->tag);
	    return InvalidBackendTag;
	}
    }
    
    stateP->tag = backendTag;
    
    return (1 + stateP - &segInOutP->procState[0]);
}


/************************************************************************/
/* The following function should be called only by the postmaster !!    */
/************************************************************************/

/************************************************************************/
/* SISetDeadProcess(segP, backendId)  set the backend status DEAD   	*/
/*  	should be called only by the postmaster when a backend died 	*/
/************************************************************************/
static void
SISetDeadProcess(SISeg *segP, int backendId)
{
    /* XXX call me.... */
    
    segP->procState[backendId - 1].resetState = false;
    segP->procState[backendId - 1].limit = -1;
    segP->procState[backendId - 1].tag = InvalidBackendTag;
}

/*
 * CleanupInvalidationState --
 * Note:
 *	This is a temporary hack.  ExitBackend should call this instead
 *	of exit (via on_exitpg).
 */
static void
CleanupInvalidationState(int status, /* XXX */
			 SISeg *segInOutP) /* XXX style */
{
    Assert(PointerIsValid(segInOutP));
    
    SISetDeadProcess(segInOutP, MyBackendId);
}


/************************************************************************/
/* SIComputeSize()  - retuns the size of a buffer segment   	    	*/
/************************************************************************/
static SISegOffsets *
SIComputeSize(int *segSize)
{
    int      	 A, B, a, b, totalSize;
    SISegOffsets *oP;
    
    A = 0;
    a = SizeSISeg;  	/* offset to first data entry */
    b = SizeOfOneSISegEntry * MAXNUMMESSAGES;
    B = A + a + b;
    totalSize = B - A;
    *segSize = totalSize;
    
    oP = (SISegOffsets *) palloc(sizeof(SISegOffsets));
    oP->startSegment = A;
    oP->offsetToFirstEntry = a; /* relatiove to A */
    oP->offsetToEndOfSegemnt = totalSize; /* relative to A */
    return(oP);
}


/************************************************************************/
/* SISetStartEntrySection(segP, offset)     - sets the offset		*/
/************************************************************************/
static void
SISetStartEntrySection(SISeg *segP, Offset offset)
{
    segP->startEntrySection = offset;
}

/************************************************************************/
/* SIGetStartEntrySection(segP)     - returnss the offset   		*/
/************************************************************************/
static Offset
SIGetStartEntrySection(SISeg *segP)
{
    return(segP->startEntrySection);
}


/************************************************************************/
/* SISetEndEntrySection(segP, offset) 	- sets the offset   		*/
/************************************************************************/
static void
SISetEndEntrySection(SISeg *segP, Offset offset)
{
    segP->endEntrySection = offset;
}

/************************************************************************/
/* SISetEndEntryChain(segP, offset) 	- sets the offset   	    	*/
/************************************************************************/
static void
SISetEndEntryChain(SISeg *segP, Offset offset)
{
    segP->endEntryChain = offset;
}

/************************************************************************/
/* SIGetEndEntryChain(segP) 	- returnss the offset	    	    	*/
/************************************************************************/
static Offset
SIGetEndEntryChain(SISeg *segP)
{
    return(segP->endEntryChain);
}

/************************************************************************/
/* SISetStartEntryChain(segP, offset) 	- sets the offset   	    	*/
/************************************************************************/
static void
SISetStartEntryChain(SISeg *segP, Offset offset)
{
    segP->startEntryChain = offset;
}

/************************************************************************/
/* SIGetStartEntryChain(segP) 	- returns  the offset	    	    	*/
/************************************************************************/
static Offset
SIGetStartEntryChain(SISeg *segP)
{
    return(segP->startEntryChain);
}

/************************************************************************/
/* SISetNumEntries(segP, num)	sets the current nuber of entries   	*/
/************************************************************************/
static bool
SISetNumEntries(SISeg *segP, int num)
{
    if ( num <= MAXNUMMESSAGES) {
        segP->numEntries =  num;
        return(true);
    } else {
        return(false);  /* table full */
    }    
}

/************************************************************************/
/* SIGetNumEntries(segP)    - returns the current nuber of entries  	*/
/************************************************************************/
static int
SIGetNumEntries(SISeg *segP)
{
    return(segP->numEntries);
}


/************************************************************************/
/* SISetMaxNumEntries(segP, num)    sets the maximal number of entries	*/
/************************************************************************/
static bool
SISetMaxNumEntries(SISeg *segP, int num)
{
    if ( num <= MAXNUMMESSAGES) {
        segP->maxNumEntries =  num;
        return(true);
    } else {
        return(false);  /* wrong number */
    }   
}


/************************************************************************/
/* SIGetProcStateLimit(segP, i)	returns the limit of read messages  	*/
/************************************************************************/
static int
SIGetProcStateLimit(SISeg *segP, int i)
{
    return(segP->procState[i].limit);
}

/************************************************************************/
/* SIIncNumEntries(segP, num)	increments the current nuber of entries	*/
/************************************************************************/
static bool
SIIncNumEntries(SISeg *segP, int num)
{
    if ((segP->numEntries + num) <= MAXNUMMESSAGES) {
        segP->numEntries = segP->numEntries + num;
        return(true);
    } else {
        return(false);  /* table full */
    }   
}

/************************************************************************/
/* SIDecNumEntries(segP, num)	decrements the current nuber of entries	*/
/************************************************************************/
static bool
SIDecNumEntries(SISeg *segP, int num)
{
    if ((segP->numEntries - num) >=  0) {
        segP->numEntries = segP->numEntries - num;
        return(true);
    } else {
        return(false);  /* not enough entries in table */
    }   
}

/************************************************************************/
/* SISetStartFreeSpace(segP, offset)  - sets the offset	    	    	*/
/************************************************************************/
static void
SISetStartFreeSpace(SISeg *segP, Offset offset)
{
    segP->startFreeSpace = offset;
}

/************************************************************************/
/* SIGetStartFreeSpace(segP)  - returns the offset  	    	    	*/
/************************************************************************/
static Offset
SIGetStartFreeSpace(SISeg *segP)
{
    return(segP->startFreeSpace);
}



/************************************************************************/
/* SIGetFirstDataEntry(segP)  returns first data entry	    	    	*/
/************************************************************************/
static SISegEntry *
SIGetFirstDataEntry(SISeg *segP)
{
    SISegEntry  *eP;
    Offset      startChain;
    
    startChain = SIGetStartEntryChain(segP);
    
    if (startChain == InvalidOffset)
    	return(NULL);
    
    eP = (SISegEntry  *) ((Pointer) segP + 
			  SIGetStartEntrySection(segP) +
			  startChain );
    return(eP);
}


/************************************************************************/
/* SIGetLastDataEntry(segP)  returns last data entry in the chain   	*/
/************************************************************************/
static SISegEntry *
SIGetLastDataEntry(SISeg *segP)
{
    SISegEntry  *eP;
    Offset      endChain;
    
    endChain = SIGetEndEntryChain(segP);
    
    if (endChain == InvalidOffset)
    	return(NULL);
    
    eP = (SISegEntry  *) ((Pointer) segP + 
			  SIGetStartEntrySection(segP) +
			  endChain );
    return(eP);
}

/************************************************************************/
/* SIGetNextDataEntry(segP, offset)  returns next data entry	    	*/
/************************************************************************/
static SISegEntry *
SIGetNextDataEntry(SISeg *segP, Offset offset)
{
    SISegEntry  *eP;
    
    if (offset == InvalidOffset)
    	return(NULL);
    
    eP = (SISegEntry  *) ((Pointer) segP +
                          SIGetStartEntrySection(segP) + 
                          offset);
    return(eP);
}


/************************************************************************/
/* SIGetNthDataEntry(segP, n)	returns the n-th data entry in chain	*/
/************************************************************************/
static SISegEntry *
SIGetNthDataEntry(SISeg *segP,
		  int n)	/* must range from 1 to MaxMessages */
{
    SISegEntry  *eP;
    int	    	i;
    
    if (n <= 0) return(NULL);
    
    eP = SIGetFirstDataEntry(segP);
    for (i = 1; i < n; i++) {
    	/* skip one and get the next	*/
    	eP = SIGetNextDataEntry(segP, eP->next);
    }
    
    return(eP);
}

/************************************************************************/
/* SIEntryOffset(segP, entryP)   returns the offset for an pointer  	*/
/************************************************************************/
static Offset
SIEntryOffset(SISeg *segP, SISegEntry *entryP)
{
    /* relative to B !! */
    return ((Offset) ((Pointer) entryP -
                      (Pointer) segP - 
                      SIGetStartEntrySection(segP) ));
}


/************************************************************************/
/* SISetDataEntry(segP, data)  - sets a message in the segemnt	    	*/
/************************************************************************/
bool
SISetDataEntry(SISeg *segP, SharedInvalidData  *data)
{
    Offset  	    offsetToNewData;
    SISegEntry 	    *eP, *lastP;
    bool    	    SISegFull();
    Offset  	    SIEntryOffset();
    Offset  	    SIGetStartFreeSpace();
    SISegEntry 	    *SIGetFirstDataEntry();
    SISegEntry 	    *SIGetNextDataEntry();
    SISegEntry 	    *SIGetLastDataEntry();
    
    if (!SIIncNumEntries(segP, 1)) 
	return(false);  /* no space */
    
    /* get a free entry */
    offsetToNewData = SIGetStartFreeSpace(segP);
    eP = SIGetNextDataEntry(segP, offsetToNewData); /* it's a free one */
    SISetStartFreeSpace(segP, eP->next);
    /* fill it up */
    eP->entryData = *data;
    eP->isfree = false;
    eP->next = InvalidOffset;
    
    /* handle insertion point at the end of the chain !!*/
    lastP = SIGetLastDataEntry(segP);
    if (lastP == NULL) {
    	/* there is no chain, insert the first entry */
    	SISetStartEntryChain(segP, SIEntryOffset(segP, eP));
    } else {
    	/* there is a last entry in the chain */
    	lastP->next = SIEntryOffset(segP, eP);
    }
    SISetEndEntryChain(segP, SIEntryOffset(segP, eP));
    return(true);
}


/************************************************************************/
/* SIDecProcLimit(segP, num)  decrements all process limits 	    	*/
/************************************************************************/
static void
SIDecProcLimit(SISeg *segP, int num)
{
    int i;
    for (i=0; i < MaxBackendId; i++) {
    	/* decrement only, if there is a limit > 0  */
    	if (segP->procState[i].limit > 0) {
    	    segP->procState[i].limit = segP->procState[i].limit - num;
    	    if (segP->procState[i].limit < 0) {
    	    	/* limit was not high enough, reset to zero */
    	    	/* negative means it's a dead backend	    */
    	    	segP->procState[i].limit = 0;
    	    }
    	}
    }
}


/************************************************************************/
/* SIDelDataEntry(segP)	    - free the FIRST entry   	    	    	*/
/************************************************************************/
bool
SIDelDataEntry(SISeg *segP)
{
    SISegEntry 	    *e1P;
    SISegEntry 	    *SIGetFirstDataEntry();
    
    if (!SIDecNumEntries(segP, 1))  {
    	/* no entries in buffer */
    	return(false);
    }
    
    e1P = SIGetFirstDataEntry(segP);
    SISetStartEntryChain(segP, e1P->next);
    if (SIGetStartEntryChain(segP) == InvalidOffset) {
    	/* it was the last entry */
    	SISetEndEntryChain(segP, InvalidOffset);
    }
    /* free the entry */
    e1P->isfree = true;
    e1P->next = SIGetStartFreeSpace(segP);
    SISetStartFreeSpace(segP, SIEntryOffset(segP, e1P));
    SIDecProcLimit(segP, 1);
    return(true); 
}



/************************************************************************/
/* SISetProcStateInvalid(segP)	checks and marks a backends state as 	*/
/*  	    	    	    	    invalid 	    	    	    	*/
/************************************************************************/
void
SISetProcStateInvalid(SISeg *segP)
{
    int i;
    
    for (i=0; i < MaxBackendId; i++) {
    	if (segP->procState[i].limit == 0) {
    	    /* backend i didn't read any message    	    	    	*/
    	    segP->procState[i].resetState = true;
    	    /*XXX signal backend that it has to reset its internal cache ? */
    	}
    }
}

/************************************************************************/
/* SIReadEntryData(segP, backendId, function)	    	    	    	*/
/*  	    	    	- marks messages to be read by id   	    	*/
/*  	    	          and executes function	    	    	    	*/
/************************************************************************/
void
SIReadEntryData(SISeg *segP,
		int backendId,
		void (*invalFunction)(),
		void (*resetFunction)())
{
    int i = 0;
    SISegEntry *data;
    
    Assert(segP->procState[backendId - 1].tag == MyBackendTag);
    
    if (!segP->procState[backendId - 1].resetState) {
    	/* invalidate data, but only those, you have not seen yet !!*/
    	/* therefore skip read messages */
    	data = SIGetNthDataEntry(segP, 
    	    	    	    	 SIGetProcStateLimit(segP, backendId - 1) + 1);
    	while (data != NULL) {
    	    i++;
    	    segP->procState[backendId - 1].limit++;  /* one more message read */
    	    invalFunction(data->entryData.cacheId, 
			  data->entryData.hashIndex,
			  &data->entryData.pointerData);
    	    data = SIGetNextDataEntry(segP, data->next);
    	}
    	/* SIDelExpiredDataEntries(segP); */
    } else {
    	/*backend must not read messages, its own state has to be reset	    */
    	elog(NOTICE, "SIMarkEntryData: cache state reset");
        resetFunction(); /* XXXX call it here, parameters? */
	
	/* new valid state--mark all messages "read" */
	segP->procState[backendId - 1].resetState = false;
	segP->procState[backendId - 1].limit = SIGetNumEntries(segP);
    }
    /* check whether we can remove dead messages    	    	    	    */
    if (i > MAXNUMMESSAGES) {
    	elog(FATAL, "SIReadEntryData: Invalid segment state");
    }
}

/************************************************************************/
/* SIDelExpiredDataEntries  (segP)  - removes irrelevant messages   	*/
/************************************************************************/
void
SIDelExpiredDataEntries(SISeg *segP)
{
    int   min, i, h;
    
    min = 9999999;
    for (i = 0; i < MaxBackendId; i++) {
    	h = SIGetProcStateLimit(segP, i);
    	if (h >= 0)  { /* backend active */
    	    if (h < min ) min = h;
    	}
    }
    if (min != 9999999) {
    	/* we can remove min messages */
    	for (i = 1; i <= min; i++) {
    	    /* this  adjusts also the state limits!*/
    	    if (!SIDelDataEntry(segP)) { 
            	elog(FATAL, "SIDelExpiredDataEntries: Invalid segment state");
    	    }
    	}
    }
}



/************************************************************************/
/* SISegInit(segP)  - initializes the segment	    	    	    	*/
/************************************************************************/
static void
SISegInit(SISeg *segP)
{
    SISegOffsets    *oP;
    int	    	    segSize, i;
    SISegEntry      *eP;
    
    oP = SIComputeSize(&segSize);
    /* set sempahore ids in the segment */
    /* XXX */
    SISetStartEntrySection(segP, oP->offsetToFirstEntry);
    SISetEndEntrySection(segP, oP->offsetToEndOfSegemnt);
    SISetStartFreeSpace(segP, 0);
    SISetStartEntryChain(segP, InvalidOffset);
    SISetEndEntryChain(segP, InvalidOffset);
    (void) SISetNumEntries(segP, 0);
    (void) SISetMaxNumEntries(segP, MAXNUMMESSAGES);
    for (i = 0; i < MaxBackendId; i++) {
    	segP->procState[i].limit = -1; 	    /* no backend active  !!*/
    	segP->procState[i].resetState = false;
	segP->procState[i].tag = InvalidBackendTag;
    }
    /* construct a chain of free entries    	    	    	    */
    for (i = 1; i < MAXNUMMESSAGES; i++)  {
    	eP = (SISegEntry  *) ((Pointer) segP +
			      SIGetStartEntrySection(segP) +
			      (i - 1) * sizeof(SISegEntry));
    	eP->isfree = true;
    	eP->next = i * sizeof(SISegEntry); /* relative to B */
    }
    /* handle the last free entry separate  	    	    	    */
    eP = (SISegEntry  *) ((Pointer) segP +
			  SIGetStartEntrySection(segP) +
			  (MAXNUMMESSAGES - 1) * sizeof(SISegEntry));
    eP->isfree = true;
    eP->next = InvalidOffset;  /* it's the end of the chain !! */
    /*
     * Be tidy
     */
    pfree(oP);
    
}



/************************************************************************/
/* SISegmentKill(key)   - kill any segment                              */
/************************************************************************/
static void
SISegmentKill(int key)	/* the corresponding key for the segment */
{   
    IpcMemoryKill(key);
}	


/************************************************************************/
/* SISegmentGet(key, size)  - get a shared segment of size <size>       */
/*                returns a segment id                                  */
/************************************************************************/
static IpcMemoryId
SISegmentGet(int key,		/* the corresponding key for the segment */
	     int size,		/* size of segment in bytes              */
	     bool create)
{
    IpcMemoryId   shmid;
    
    if (create) {
	shmid = IpcMemoryCreate(key, size, IPCProtection);
    } else {
	shmid = IpcMemoryIdGet(key, size);
    }
    return(shmid);
}

/************************************************************************/
/* SISegmentAttach(shmid)   - attach a shared segment with id shmid     */
/************************************************************************/
static void
SISegmentAttach(IpcMemoryId shmid)
{
    shmInvalBuffer = (struct SISeg *) IpcMemoryAttach(shmid);
    if (shmInvalBuffer == IpcMemAttachFailed) {   
	/* XXX use validity function */
	elog(NOTICE, "SISegmentAttach: Could not attach segment");
	elog(FATAL, "SISegmentAttach: %m");
    }
}


/************************************************************************/
/* SISegmentInit(killExistingSegment, key)  initialize segment	    	*/
/************************************************************************/
int
SISegmentInit(bool killExistingSegment, IPCKey key)
{ 
    SISegOffsets	*oP;
    int     	    	segSize;
    IpcMemoryId	    	shmId;
    bool    	    	create;
    
    if (killExistingSegment) {
        /* Kill existing segment */
        /* set semaphore */
    	SISegmentKill(key);
    	
        /* Get a shared segment */
	
        oP = SIComputeSize(&segSize);
	/*
	 * Be tidy
	 */
	pfree(oP);
	
        create = true;
        shmId = SISegmentGet(key,segSize, create);
        if (shmId < 0) {
            perror("SISegmentGet: failed");
            return(-1);                                     /* an error */
        }
	
        /* Attach the shared cache invalidation  segment */
        /* sets the global variable shmInvalBuffer */
        SISegmentAttach(shmId);
	
        /* Init shared memory table */
        SISegInit(shmInvalBuffer);  
    } else {
    	/* use an existing segment */
    	create = false;
    	shmId = SISegmentGet(key, 0, create);
    	if (shmId < 0) {
    	    perror("SISegmentGet: getting an existent segment failed");
    	    return(-1);	    	    	    	    	    /* an error */
    	}
    	/* Attach the shared cache invalidation segment */
      	SISegmentAttach(shmId);
    }
    return(1);
}

