/**********************************************************************
  $Header: buf_sync.c,v 1.2 89/02/02 13:47:15 dillon Exp $"
  buf_sync.c
  has code to provide access to buffers,
  buffer descriptors and the free list
  all of which are stored in shared memory

  contains the externally callable functions:

  CreateBufferSemaphore
  CreateBufferPoolMemory
  InitSharedBuffers
  InitBufferSemaphore
  InitBufferPoolMemory

  as well as the buffer-manager callable functions :

  ReadSharedBuffer
  WriteSharedBuffer
  FlushSharedBuffer
  ReleaseSharedBuffer

 **********************************************************************/

/* #define	BUFFERDEBUG	1	/* enable buffer debug messages */

#include "ipc.h"
#include "bufmgr.h"
#include "internal.h"
#include "buf_sync.h"
#include "log.h"
#include "rel.h"
#include "relcache.h"

#define FREE_HEAD -1

#define NO_NEXT_BUF -2
#define NO_PREV_BUF NO_NEXT_BUF

#define NOT_IN_FREE(sbufdesc) (sbufdesc->next == NO_NEXT_BUF && sbufdesc->prev == NO_PREV_BUF)

#define IN_FREE(sbufdesc) (sbufdesc->next != NO_NEXT_BUF && sbufdesc->prev != NO_PREV_BUF)

#define UNLINK(sbufdesc) ((sbufdesc->next = NO_NEXT_BUF) &&\
(sbufdesc->prev = NO_PREV_BUF))

#define PREV_BUF(sb) shared_bufdesc[sb->prev]
#define NEXT_BUF(sb) shared_bufdesc[sb->next]


#define BUF_PERMISSION 0700

/* local declarations - functions and vars */

static Sbufdesc *GetSharedBufferFromFree();
static void PutOnSharedFreeList();
void DecrSharedRefCount();
void IncrSharedRefCount();
void GetAndHoldBufSem();
void ReleaseBufSem();

static IpcMemoryId BufferPoolId = -1;
static IpcSemaphoreId BufferSemaphoreId = -1;
static char *SharedBufferPool = NULL;
static Sbufdesc *SharedFreeList;
static Sbufdesc *shared_bufdesc = NULL;
BlockData    *shared_bufdata;
static uint16 numSem = 0; /* Number of Buffer Semaphore Locks */

/**************************************************

  InitBufferSemaphore 
  - takes a key and makes a semaphore if 
    it doesn't already exist.

 **************************************************/

void 
InitBufferSemaphore(key)
     IPCKey key;
{
    int status;
    BufferSemaphoreId = IpcSemaphoreCreate(key,1,IPCProtection,BM_SEM_INIT,
					   &status);
    /* XXX check status */

} /* InitBufferSemaphore */

/**************************************************

  InitBufferPoolMemory

  makes the buffer pool if it does not
  already exist.
  pool consisting of buffer descriptors and
  their respective buffer data.

  the size of the buffer pool =
  sizeof(bufdesc)*4*NDBUFS +
  BLCKSZ*NDBUFS.
 
  **************************************************/
void
InitBufferPoolMemory(key,create)
     IPCKey key;
     bool create;
{
    if(create) {
	BufferPoolId = 
	  IpcMemoryCreate(key,
			  NDBUFS*BLCKSZ +
			  (NDBUFS+1)*sizeof(Sbufdesc)
			  ,BUF_PERMISSION);
    } else {
	BufferPoolId = IpcMemoryIdGet(key,0);
    }
	/* XXX check validity */
    if(BufferPoolId<0)
      elog(FATAL,"Unable to get shared buffers");
}

/**************************************************
  CreateBufferPoolMemory
  takes an IPCKey and attempts to create 
  and initialize the buffer

  NOTE: called only by Postmaster, and then 
        only *ONCE*
 **************************************************/

void
CreateBufferPoolMemory(key)
     IPCKey key;
{
    IpcMemoryKill(key);
    InitBufferPoolMemory(key,true);
    InitSharedBuffers(key);
} /* CreateBufferPoolMemory */


/**************************************************
  CreateBufferSemaphore
  - takes a key and makes a semaphore if it
    doesn't already exist.
  - if semaphore already exists, destroy first
 **************************************************/

void
CreateBufferSemaphore(key)
     IPCKey key;
{
  /* XXX should BM_debug if semaphore already exists ? */
  IpcSemaphoreKill(key);
  InitBufferSemaphore(key);
  /* XXX check status */
}

/**************************************************
  AttachBufferPoolMemory
  takes the shmid and returns a pointer
  to the memory location
 **************************************************/

void
AttachBufferPoolMemory(shmid)
     IpcMemoryId shmid;
{
    SharedBufferPool = IpcMemoryAttach(shmid);
    if(SharedBufferPool == IpcMemAttachFailed) {
	/* XXX use validity function */
	BM_debug(FATAL,"AttachBufferPoolMemory : could not attach segment");
    }
}

/**************************************************
  InitializeSharedBuffers
  does all the dirty work for attaching and 
  initializing the shared buffers.
  Should be called by 

  NOTES :
  1) uses static globals:
	 SharedBufferPool;
	 shared_bufdesc;
	 shared_bufdata;
	 SharedFreeList;

 **************************************************/
void
InitSharedBuffers(key)
     IPCKey key;
{
    register int i;
    
    AttachSharedBuffers(key);
    GetAndHoldBufSem();
    for (i = 0; i < NDBUFS; i++) {
	shared_bufdesc[i].next = i+1;
	shared_bufdesc[i].prev = i-1;
	shared_bufdesc[i].reln_oid.relId = InvalidObjectId;
	shared_bufdesc[i].reln_oid.dbId = InvalidObjectId;
	shared_bufdesc[i].data = i; /* always */
	shared_bufdesc[i].flags = BM_INIT;
	shared_bufdesc[i].refcount = 0;
    }
    SharedFreeList->next = 0;
    SharedFreeList->prev = NDBUFS-1;
    shared_bufdesc[0].prev = FREE_HEAD;
    shared_bufdesc[NDBUFS-1].next = FREE_HEAD;
    ReleaseBufSem();
    shared_buf_status();
}

void
AttachSharedBuffers(key)
     IPCKey key;
{
    if(BufferPoolId == -1)
      BM_debug(FATAL,"BufferPoolMemory not in shared memory");
    AttachBufferPoolMemory(BufferPoolId);
    GetAndHoldBufSem();
    SharedFreeList = (Sbufdesc *)SharedBufferPool;
    shared_bufdesc = &SharedFreeList[1]; /* hack */
    (Sbufdesc *)shared_bufdata = &shared_bufdesc[NDBUFS];
    ReleaseBufSem();
}

/*
 * Note:
 *	GetAndHoldBufSem may be called by ReadSharedBuffer called by
 *	ReadBuffer called by RelationGetBuffer called by ... called by
 *	RelationIdGetRelation called by ... called by ReadSharedBuffer
 *	called by ReadBuffer called by RelationGetBuffer called by ....
 *
 *	Thus, numSem may become as high as 2.
 */
void
GetAndHoldBufSem()
{
	if (numSem == 0) {
		IpcSemaphoreLock(BufferSemaphoreId, 0, BM_SEM_LOCK);

	} else if (numSem != 1) {

		if (numSem == 2) {
			ReleaseBufSem();
		}
		elog(FATAL, "GetAndHoldBufSem : called with %d locks", numSem);
	}

	numSem += 1;
}

void
ReleaseBufSem()
{
	if (numSem == 1) {
		IpcSemaphoreUnlock(BufferSemaphoreId, 0, BM_SEM_LOCK);

	} else if (numSem != 2) {
		elog(FATAL,"ReleaseBufSem: called with %d locks", numSem);
	}

	numSem -= 1;
}

#ifndef POSTMASTER
Sbufdesc 
*ReadSharedBuffer(reln,blknum)
     Relation reln;
     BlockNumber blknum;
{
    LRelId relOID;
    int fd = reln->rd_fd;
    int cc;
    register int i;
    Sbufdesc *sb;
    /* sigh, only way to find is sequential for now, until
       we put some fancy hashing in */
    relOID = RelationGetLRelId(reln);
    GetAndHoldBufSem();
    if(blknum != P_NEW) {

	/* try to find the buffer in shm */

	for (i=0;i<NDBUFS;i++) {
	    sb = &shared_bufdesc[i];
	    if(LtEqualsRelId(relOID,sb->reln_oid) && sb->blknum == blknum) {
		if(IN_FREE(sb)) {
		    /* XXX is on SharedFreeList */
		    PREV_BUF(sb).next = sb->next;
		    NEXT_BUF(sb).prev = sb->prev;
		    UNLINK(sb);
/*		    sb->prev->next = sb->next;
		    sb->next->prev = sb->prev;
		    sb->next = sb->prev = NULL;*/
		    if(sb->refcount)
		      BM_debug(DEBUG,"buffer with refcount > 0 on SharedFreeList");
		    sb->refcount = 1;
		} else {
					IncrSharedRefCount(sb);
		}
		ReleaseBufSem();
		return(sb); /* found it, so leave */
	    }
	} /* for */
	sb = NULL;

	/* unable to find in shm, so get free, then read into memory */

	if(i==NDBUFS) {
	    sb = GetSharedBufferFromFree();
	    if(sb==NULL)
	      elog(WARN,"null sharedbufdesc returned");
	    BM_debug(BUFDEB,"read: buffer not in shm, reading from disk");
	    if (FileSeek(fd, (long)blknum * BLCKSZ, L_SET) < 0)
	      BM_debug(FATAL, "lseek:%m");
	    if (cc = FileRead(fd, &shared_bufdata[sb->data], BLCKSZ)) {
		if (cc != BLCKSZ)
		  BM_debug(FATAL, "read:%m");
	    } else {				/* past eof? */
		bzero((char *)&shared_bufdata[sb->data], BLCKSZ);
		/* should we require blank blocks to be recognized? */
	    }
	    sb->blknum = blknum;
	} /* if unable to find in memory */

    } else {	/*blknum = P_NEW*/

	long	loc;
	/* P_NEW */
	sb=GetSharedBufferFromFree();
	if ((loc = FileSeek(fd, 0l, L_XTND)) < 0) {
	    ReleaseBufSem();
	    BM_debug(FATAL, "lseek:%m");
	}
	sb->blknum = loc / BLCKSZ;

	/* have to make sure it is in fact extended */
	bzero((char *)&shared_bufdata[sb->data], BLCKSZ);
	if (FileWrite(fd, (char *)&shared_bufdata[sb->data], BLCKSZ) < BLCKSZ){
	    ReleaseBufSem();
	    BM_debug(FATAL, "write:%m");
	}

    } 
    /* code common to P_NEW and couldn't find in memory */

    sb->reln_oid = relOID;
    sb->refcount = 1;
    sb->blksz = BLCKSZ ; /* will change eventually */
    sb->flags = BM_INIT; /* which is pinned */

    ReleaseBufSem();
    return(sb);
} /* ReadSharedBuffers */

/**************************************************

  WriteSharedBuffer
  - takes the shared bufdesc and depending
  on whether late-write is set, 
  will either do a FileWrite (thru Flush. . . )
  or just mark the buffer as 'dirty'

 **************************************************/

void
WriteSharedBuffer(buf)
     Sbufdesc *buf;
{
    GetAndHoldBufSem();
#ifndef LATEWRITE
    ReleaseBufSem();
    FlushSharedBuffer(buf);
    return;
#else
    buf->flags = BM_DIRTY; /* valid, but dirty */
    DecrSharedRefCount(buf);
    ReleaseBufSem();
#endif 

} /*WriteSharedBuffer*/


/**************************************************

  FlushSharedBuffer
  - takes the shared bufdesc and using
  the reln-OID stored with it writes out the
  buffer, then marks the buffer as clean 

 **************************************************/

void
FlushSharedBuffer(buf)
     Sbufdesc *buf;
{
    Relation temp_rel;
    File temp_fd;
    long nblks = buf->blknum;
	BM_debug(BUFDEB,"nblks = %d\n",nblks);
    GetAndHoldBufSem();
    temp_rel = RelationIdGetRelation(LRelIdGetRelationId(buf->reln_oid));
    temp_fd = RelationGetFile(temp_rel);
    if(FileSeek(temp_fd,(long)(BLCKSZ*nblks),L_SET) < 0)
      BM_debug(WARN,"FlushSharedBuffer : unable to lseek (no latewrite)");
    if(FileWrite(temp_fd,(char *)&shared_bufdata[buf->data],BLCKSZ) != BLCKSZ)
      BM_debug(WARN,"FlushSharedBuffer : unable to write");
    RelationDecrementReferenceCount(temp_rel);
/*  RelationFree(temp_rel);*/
    buf->flags = BM_INIT; /* at this pt, cannot be dirty, invalid etc, so */
    DecrSharedRefCount(buf);
    ReleaseBufSem();

} /* FlushSharedBuffer */

/**************************************************

  ReleaseSharedBuffer
  - decrements the ref_count on the buffer
  (which automatically frees it if it hits zero)

 **************************************************/

void
ReleaseSharedBuffer(buf)
     Sbufdesc *buf;
{
    GetAndHoldBufSem();
    DecrSharedRefCount(buf);
    ReleaseBufSem();
}
/**********************************************************************
  
  the following functions assume that the semaphore is already
  locked by the calling functions.

 **********************************************************************/

void
IncrSharedRefCount(shared)
     Sbufdesc *shared;
{
#ifdef	BUFFERDEBUG
	elog(DEBUG, "Incr(0x%x) [%d/%d]", shared, shared->refcount,
		IN_FREE(shared));
#endif

    /* XXX - should check that semaphore is already gotten */
    /* if on sharedfree_list, make sure
       to unlink */

	if (IN_FREE(shared)) {
		Assert(shared->refcount == 0);

		NEXT_BUF(shared).prev = shared->prev;
		PREV_BUF(shared).next = shared->next;
		shared->next = NO_NEXT_BUF;
		shared->prev = NO_PREV_BUF;
	}

	shared->refcount += 1;
}

void
DecrSharedRefCount(desc)
     Sbufdesc *desc;
{
#ifdef	BUFFERDEBUG
	elog(DEBUG, "Decr(0x%x) [%d/%d]", desc, desc->refcount, IN_FREE(desc));
#endif
	if (desc->refcount == 0) {
		if (IN_FREE(desc)) {
			elog(FATAL, "DecrSharedRefCount: on free buffer");
		}
		elog(WARN, "DecrSharedRefCount: zero refcount");
	}

	desc->refcount -= 1;

	if (IN_FREE(desc)) {
		elog(FATAL, "DecrSharedRefCount: pinned free buffer found");
	}
	if (desc->refcount == 0) {
		PutOnSharedFreeList(desc);
	}
}

/**************************************************
  PutOnSharedFreeList
  assumes that the semaphore is already held
  by this process

 **************************************************/
static void 
PutOnSharedFreeList(buf)
     Sbufdesc *buf;
{
    Assert(PointerIsValid(SharedFreeList));
    Assert(PointerIsValid(buf));
    BM_debug(BUFDEB,"Putting onto SharedFree\n");
    disp_shared_free();
    /* buf cannot already be on the shared free */
    if (IN_FREE(buf))
      BM_debug(DEBUG,"Trying to free already freed buffer");
    buf->prev = SharedFreeList->prev;
    buf->next = FREE_HEAD;
    PREV_BUF(SharedFreeList).next = buf-shared_bufdesc;
    SharedFreeList->prev = buf-shared_bufdesc;/*XXX - ugly*/
    disp_shared_free();
} /* PutOnSharedFreeList */

static Sbufdesc *
GetSharedBufferFromFree()
{
    Sbufdesc *new_buf;
    Assert(PointerIsValid(SharedFreeList));
    Assert(PointerIsValid
	   (&shared_bufdesc[SharedFreeList->next]));
    BM_debug(BUFDEB,"Getting Shared Free\n");
    shared_buf_status();
    if(SharedFreeList->next == FREE_HEAD) {
      /* out of space, time to throw out an old shared buf */
      BM_debug(WARN,"out of free buffers: time to abort !\n");
      return(NULL);
    }
    new_buf = &NEXT_BUF(SharedFreeList);
    PREV_BUF(new_buf).next = new_buf->next;
    NEXT_BUF(new_buf).prev = new_buf->prev;
    new_buf->next = NO_NEXT_BUF;
    new_buf->prev = NO_PREV_BUF;
#ifdef LATEWRITE
    if(new_buf->flags & BM_DIRTY) {
      Relation rel_temp;
      int   fd_temp;
      if(new_buf->refcount != 0)
	BM_debug(DEBUG,"GetSharedBuffer : Pinned buffer in free_list");
      rel_temp = (Relation)LRelIdGetRelation(new_buf->reln_oid);
      fd_temp = RelationGetFile(rel_temp);
      if(FileSeek(fd_temp,BLCKSZ*new_buf->blknum,L_SET) < 0)
	BM_debug(WARN,"GetSharedBufferFromFree : unable to FileSeek");
      if(FileWrite(fd_temp,&shared_bufdata[new_buf->data],BLCKSZ)!=BLCKSZ)
	BM_debug(WARN,"GetSharedBufferFromFree : unable to FileWrite");
      RelationDecrementReferenceCount(rel_temp);
      RelationFree(rel_temp); /* will actually only free if the
				 ref_count = 0 */
    }
#else
    if(new_buf->flags) 
      BM_debug(DEBUG,"unflushed buffer in free list");
#endif
    return(new_buf);
}

#endif /*POSTMASTER*/

disp_shared_free()
{
/*
  Sbufdesc *temp = SharedFreeList;
  while(temp->next != SharedFreeList) {
    printtf("%d,",temp);
    temp = temp->next;
  }
  printf("\n");
*/
}

static shared_buf_status ()
{
  char buf[NDBUFS*20];
  Sbufdesc *temp = NULL;
  register int i;
  static char *head = "BufferManagerStatus :\n shared_free_head: %u";
  Assert(PointerIsValid(SharedFreeList));
  Assert(PointerIsValid(shared_bufdesc));
  /*  if(((u_long)shared_bufdesc-(u_long)SharedFreeList ) !=
      sizeof(Sbufdesc))
      elog(DEBUG,"shared_buf_status : internal memory error.");
      elog(DEBUG,head,SharedFreeList);
      temp = &shared_bufdesc[SharedFreeList->next];
      i = 0;
      while(temp->next != FREE_HEAD){
    sprintf(buf+7*i,"%7u ",temp);
    i++;
    temp = &NEXT_BUF(temp);
    }
    */
}

    BM_debug(level,buf,p0,p1,p2,p3,p4,p5)
int level;
char *buf;
{
#ifdef BUFMGR_DEBUG    
    if(level==BUFDEB)
      level=DEBUG;
#else
    if(level==BUFDEB)
      return(0);
#endif
    elog(level,buf,p0,p1,p2,p3,p4,p5);
}

#ifndef	POSTMASTER
FlushDirtyShared()
{ 
    register int i;
    for(i=0;i<NDBUFS;i++)
	if(shared_bufdesc[i].flags == BM_DIRTY) {
		GetAndHoldBufSem();
		IncrSharedRefCount(&shared_bufdesc[i]);
		ReleaseBufSem();
		FlushSharedBuffer(&shared_bufdesc[i]);
	}
}
#endif	/* !defined(POSTMASTER) */
