head	1.23;
access;
symbols;
locks; strict;
comment	@ * @;


1.23
date	92.06.17.04.57.39;	author mer;	state Exp;
branches;
next	1.22;

1.22
date	92.05.28.21.12.39;	author mer;	state Exp;
branches;
next	1.21;

1.21
date	92.03.30.00.17.00;	author mer;	state Exp;
branches;
next	1.20;

1.20
date	92.02.25.21.39.14;	author mer;	state Exp;
branches;
next	1.19;

1.19
date	92.02.24.21.33.27;	author mer;	state Exp;
branches;
next	1.18;

1.18
date	91.11.14.19.39.58;	author kemnitz;	state Exp;
branches;
next	1.17;

1.17
date	91.11.08.15.45.30;	author kemnitz;	state Exp;
branches;
next	1.16;

1.16
date	91.08.29.23.52.03;	author mer;	state Exp;
branches;
next	1.15;

1.15
date	91.08.26.18.02.25;	author mer;	state Exp;
branches;
next	1.14;

1.14
date	91.08.14.22.05.19;	author mer;	state Exp;
branches;
next	1.13;

1.13
date	91.08.12.15.16.24;	author mer;	state Exp;
branches;
next	1.12;

1.12
date	91.08.03.00.59.52;	author mer;	state Exp;
branches;
next	1.11;

1.11
date	91.08.01.14.55.18;	author mer;	state Exp;
branches;
next	1.10;

1.10
date	91.07.29.16.52.13;	author mer;	state Exp;
branches;
next	1.9;

1.9
date	91.07.29.07.19.44;	author mer;	state Exp;
branches;
next	1.8;

1.8
date	91.07.23.20.30.30;	author mer;	state Exp;
branches;
next	1.7;

1.7
date	91.07.23.20.07.27;	author mer;	state Exp;
branches;
next	1.6;

1.6
date	91.07.23.19.29.39;	author mer;	state Exp;
branches;
next	1.5;

1.5
date	91.07.23.13.15.32;	author kemnitz;	state Exp;
branches;
next	1.4;

1.4
date	91.07.23.10.03.32;	author kemnitz;	state Exp;
branches;
next	1.3;

1.3
date	91.07.22.17.34.33;	author mer;	state Exp;
branches;
next	1.2;

1.2
date	91.07.22.14.22.57;	author mer;	state Exp;
branches;
next	1.1;

1.1
date	91.07.15.10.41.13;	author mer;	state Exp;
branches;
next	;


desc
@@


1.23
log
@add some debugging macros (got really tired adding trace statements to debug)
@
text
@/*
 * lock.c -- simple lock acquisition
 *
 *  Outside modules can create a lock table and acquire/release
 *  locks.  A lock table is a shared memory hash table.  When
 *  a process tries to acquire a lock of a type that conflicts
 *  with existing locks, it is put to sleep using the routines
 *  in storage/lmgr/proc.c.
 *
 *  Interface:
 *
 *  LockAcquire(), LockRelease(), LockTabInit().
 *
 *  LockReplace() is called only within this module and by the
 *  	lkchain module.  It releases a lock without looking
 * 	the lock up in the lock table.
 *
 *  NOTE: This module is used to define new lock tables.  The
 *	multi-level lock table (multi.c) used by the heap
 *	access methods calls these routines.  See multi.c for
 *	examples showing how to use this interface.
 *
 * $Header: /u/mer/pg/src/storage/lmgr/RCS/lock.c,v 1.22 1992/05/28 21:12:39 mer Exp $
 */
#include "storage/shmem.h"
#include "storage/spin.h"
#include "storage/proc.h"
#include "storage/lock.h"
#include "utils/hsearch.h"
#include "utils/log.h"
#include "access/xact.h"

#ifndef LOCK_MGR_DEBUG

#define LOCK_PRINT(where,tag,type)
#define XID_PRINT(where,xidentP)

#else LOCK_MGR_DEBUG

#define LOCK_PRINT(where,tag,type)\
  elog(DEBUG, "%s: rel (%d) dbid (%d) tid (%d,%d) type (%d)\n",where, \
	 tag->relId, tag->dbId, \
	 ( (tag->tupleId.blockData.data[0] >= 0) ? \
		BlockIdGetBlockNumber(&tag->tupleId.blockData) : -1 ), \
	 tag->tupleId.positionData, \
	 type);

#define XID_PRINT(where,xidentP)\
  elog(DEBUG,\
       "%s:xid (%d) pid (%d) lock (%x) nHolding (%d) holders (%d,%d,%d,%d,%d)",\
       where,\
       xidentP->tag.xid,\
       xidentP->tag.pid,\
       xidentP->tag.lock,\
       xidentP->nHolding,\
       xidentP->holders[1],\
       xidentP->holders[2],\
       xidentP->holders[3],\
       xidentP->holders[4],\
       xidentP->holders[5]);

#endif LOCK_MGR_DEBUG

void LockTypeInit ARGS((
	LOCKTAB *ltable, 
	MASK *conflictsP, 
	int *prioP, 
	int ntypes
));

extern int MyPid;		/* For parallel backends w/same xid */
SPINLOCK LockMgrLock;		/* in Shmem or created in CreateSpinlocks() */

/* This is to simplify/speed up some bit arithmetic */

static MASK	BITS_OFF[MAX_LOCKTYPES];
static MASK	BITS_ON[MAX_LOCKTYPES];

/* -----------------
 * XXX Want to move this to this file
 * -----------------
 */
static bool LockingIsDisabled;

/* ------------------
 * from storage/ipc/shmem.c
 * ------------------
 */
extern HTAB *ShmemInitHash();

/* -------------------
 * map from tableId to the lock table structure
 * -------------------
 */
LOCKTAB *AllTables[MAX_TABLES];

/* -------------------
 * no zero-th table
 * -------------------
 */
int	NumTables = 1;

/* -------------------
 * InitLocks -- Init the lock module.  Create a private data
 *	structure for constructing conflict masks.
 * -------------------
 */
void
InitLocks()
{
  int i;
  int bit;

  bit = 1;
  /* -------------------
   * remember 0th locktype is invalid
   * -------------------
   */
  for (i=0;i<MAX_LOCKTYPES;i++,bit <<= 1)
  {
    BITS_ON[i] = bit;
    BITS_OFF[i] = ~bit;
  }
}

/* -------------------
 * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
 * ------------------
 */
void
LockDisable(status)
int status;
{
  LockingIsDisabled = status;
}


/*
 * LockTypeInit -- initialize the lock table's lock type
 *	structures
 *
 * Notes: just copying.  Should only be called once.
 */
static
void
LockTypeInit(ltable, conflictsP, prioP, ntypes)
LOCKTAB	*ltable;
MASK	*conflictsP;
int	*prioP;
int 	ntypes;
{
  int	i;

  ltable->ctl->nLockTypes = ntypes;
  ntypes++;
  for (i=0;i<ntypes;i++,prioP++,conflictsP++)
  {
    ltable->ctl->conflictTab[i] = *conflictsP;
    ltable->ctl->prio[i] = *prioP;
  }
}

/*
 * LockTabInit -- initialize a lock table structure
 *
 * Notes:
 *	(a) a lock table has four separate entries in the binding
 *	table.  This is because every shared hash table and spinlock
 *	has its name stored in the binding table at its creation.  It
 *	is wasteful, in this case, but not much space is involved.
 *
 */
LockTableId
LockTabInit(tabName, conflictsP, prioP, ntypes)
char *tabName;
MASK	*conflictsP;
int	*prioP;
int 	ntypes;
{
  LOCKTAB *ltable;
  char *shmemName;
  HASHCTL info;
  int hash_flags;
  Boolean	found;
  int status = TRUE;

  if (ntypes > MAX_LOCKTYPES)
  {
    elog(NOTICE,"LockTabInit: too many lock types %d greater than %d",
	 ntypes,MAX_LOCKTYPES);
    return(INVALID_TABLEID);
  }

  if (NumTables > MAX_TABLES)
  {
    elog(NOTICE,
	 "LockTabInit: system limit of MAX_TABLES (%d) lock tables",
	 MAX_TABLES);
    return(INVALID_TABLEID);
  }

  /* allocate a string for the binding table lookup */
  shmemName = (char *) palloc((unsigned)(strlen(tabName)+32));
  if (! shmemName)
  {
    elog(NOTICE,"LockTabInit: couldn't malloc string %s \n",tabName);
    return(INVALID_TABLEID);
  }

  /* each lock table has a non-shared header */
  ltable = (LOCKTAB *) palloc((unsigned) sizeof(LOCKTAB));
  if (! ltable)
  {
    elog(NOTICE,"LockTabInit: couldn't malloc lock table %s\n",tabName);
    (void) pfree (shmemName);
    return(INVALID_TABLEID);
  }

  /* ------------------------
   * find/acquire the spinlock for the table
   * ------------------------
   */
   SpinAcquire(LockMgrLock);


  /* -----------------------
   * allocate a control structure from shared memory or attach to it
   * if it already exists.
   * -----------------------
   */
  sprintf(shmemName,"%s (ctl)",tabName);
  ltable->ctl = (LOCKCTL *)
    ShmemInitStruct(shmemName,(unsigned)sizeof(LOCKCTL),&found);

  if (! ltable->ctl)
  {
    elog(FATAL,"LockTabInit: couldn't initialize %s",tabName);
    status = FALSE;
  }

  /* ----------------
   * we're first - initialize
   * ----------------
   */
  if (! found)
  {
    bzero(ltable->ctl,sizeof(LOCKCTL));
    ltable->ctl->masterLock = LockMgrLock;
    ltable->ctl->tableId = NumTables;
  }

  /* --------------------
   * other modules refer to the lock table by a tableId
   * --------------------
   */
  AllTables[NumTables] = ltable;
  NumTables++;
  Assert(NumTables <= MAX_TABLES);

  /* ----------------------
   * allocate a hash table for the lock tags.  This is used
   * to find the different locks.
   * ----------------------
   */
  info.keysize =  sizeof(LOCKTAG);
  info.datasize = sizeof(LOCK);
  info.hash = tag_hash;
  hash_flags = (HASH_ELEM | HASH_FUNCTION);

  sprintf(shmemName,"%s (lock hash)",tabName);
  ltable->lockHash = (HTAB *) ShmemInitHash(shmemName,
		  INIT_TABLE_SIZE,MAX_TABLE_SIZE,
		  &info,hash_flags);

  Assert( ltable->lockHash->hash == tag_hash);
  if (! ltable->lockHash)
  {
    elog(FATAL,"LockTabInit: couldn't initialize %s",tabName);
    status = FALSE;
  }

  /* -------------------------
   * allocate an xid table.  When different transactions hold
   * the same lock, additional information must be saved (locks per tx).
   * -------------------------
   */
  info.keysize = XID_TAGSIZE;
  info.datasize = sizeof(XIDLookupEnt);
  info.hash = tag_hash;
  hash_flags = (HASH_ELEM | HASH_FUNCTION);

  sprintf(shmemName,"%s (xid hash)",tabName);
  ltable->xidHash = (HTAB *) ShmemInitHash(shmemName,
		  INIT_TABLE_SIZE,MAX_TABLE_SIZE,
		  &info,hash_flags);

  if (! ltable->xidHash)
  {
    elog(FATAL,"LockTabInit: couldn't initialize %s",tabName);
    status = FALSE;
  }

  /* init ctl data structures */
  LockTypeInit(ltable, conflictsP, prioP, ntypes);

  SpinRelease(LockMgrLock);

  (void) pfree (shmemName);

  if (status)
    return(ltable->ctl->tableId);
  else
    return(INVALID_TABLEID);
}

/*
 * LockTabRename -- allocate another tableId to the same
 *	lock table.
 *
 * NOTES: Both the lock module and the lock chain (lchain.c)
 *	module use table id's to distinguish between different
 *	kinds of locks.  Short term and long term locks look
 *	the same to the lock table, but are handled differently
 *	by the lock chain manager.  This function allows the
 *	client to use different tableIds when acquiring/releasing
 *	short term and long term locks.
 */
LockTableId
LockTabRename(tableId)
LockTableId	tableId;
{
  LockTableId	newTableId;

  if (NumTables >= MAX_TABLES)
  {
    return(INVALID_TABLEID);
  }
  if (AllTables[tableId] == INVALID_TABLEID)
  {
    return(INVALID_TABLEID);
  }

  /* other modules refer to the lock table by a tableId */
  newTableId = NumTables;
  NumTables++;

  AllTables[newTableId] = AllTables[tableId];
  return(newTableId);
}

/*
 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
 *	set lock if/when no conflicts.
 *
 * Returns: TRUE if parameters are correct, FALSE otherwise.
 *
 * Side Effects: The lock is always acquired.  No way to abort
 *	a lock acquisition other than aborting the transaction.
 *	Lock is recorded in the lkchain.
 */
bool
LockAcquire(tableId, lockName, lockt)
LockTableId		tableId;
LOCKTAG		*lockName;
LOCKT		lockt;
{
  XIDLookupEnt	*result,item;
  HTAB		*xidTable;
  Boolean	found;
  LOCK		*lock = NULL;
  SPINLOCK 	masterLock;
  LOCKTAB 	*ltable;
  int 		status;
  TransactionId	myXid;

  Assert (tableId < NumTables);
  ltable = AllTables[tableId];
  if (!ltable)
  {
    elog(NOTICE,"LockAcquire: bad lock table %d",tableId);
    return  (FALSE);
  }

  if (LockingIsDisabled)
  {
    return(TRUE);
  }

  LOCK_PRINT("Acquire",lockName,lockt);
  masterLock = ltable->ctl->masterLock;

  SpinAcquire(masterLock);

  Assert( ltable->lockHash->hash == tag_hash);
  lock = (LOCK *)hash_search(ltable->lockHash,(Pointer)lockName,HASH_ENTER,&found);

  if (! lock)
  {
    SpinRelease(masterLock);
    elog(FATAL,"LockAcquire: lock table %d is corrupted",tableId);
    return(FALSE);
  }

  /* --------------------
   * if there was nothing else there, complete initialization
   * --------------------
   */
  if  (! found)
  {
    lock->mask = 0;
    ProcQueueInit(&(lock->waitProcs));
    bzero((char *)lock->holders,sizeof(int)*MAX_LOCKTYPES);
    bzero((char *)lock->activeHolders,sizeof(int)*MAX_LOCKTYPES);
    lock->nHolding = 0;
    lock->nActive = 0;

    Assert ( BlockIdEquals(&(lock->tag.tupleId.blockData), &(lockName->tupleId.blockData)) );

  }

  /* ------------------
   * add an element to the lock queue so that we can clear the
   * locks at end of transaction.
   * ------------------
   */
  xidTable = ltable->xidHash;
  myXid = GetCurrentTransactionId();

  /* ------------------
   * Zero out all of the tag bytes (this clears the padding bytes for long
   * word alignment and ensures hashing consistency).
   * ------------------
   */
  bzero(&item, XID_TAGSIZE);
  TransactionIdStore(myXid, &item.tag.xid);
  item.tag.lock = MAKE_OFFSET(lock);
  item.tag.pid = MyPid;

  result = (XIDLookupEnt *)hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found);
  if (!result)
  {
    elog(NOTICE,"LockResolveConflicts: xid table corrupted");
    return(STATUS_ERROR);
  }
  if (!found)
  {
    XID_PRINT("queueing XidEnt LockAcquire:", result);
    ProcAddLock(&result->queue);
    result->nHolding = 0;
    bzero((char *)result->holders,sizeof(int)*MAX_LOCKTYPES);
  }

  /* ----------------
   * lock->nholding tells us how many processes have _tried_ to
   * acquire this lock,  Regardless of whether they succeeded or
   * failed in doing so.
   * ----------------
   */
  lock->nHolding++;
  lock->holders[lockt]++;

  /* --------------------
   * If I'm the only one holding a lock, then there
   * cannot be a conflict.  Need to subtract one from the
   * lock's count since we just bumped the count up by 1 
   * above.
   * --------------------
   */
  if (result->nHolding == lock->nActive)
  {
    result->holders[lockt]++;
    result->nHolding++;
    GrantLock(lock, lockt);
    SpinRelease(masterLock);
    return(TRUE);
  }

  Assert(result->nHolding <= lock->nActive);

  status = LockResolveConflicts(ltable, lock, lockt, myXid, (int) MyPid);

  if (status == STATUS_OK)
  {
    GrantLock(lock, lockt);
  }
  else if (status == STATUS_FOUND)
  {
    status = WaitOnLock(ltable, tableId, lock, lockt);
    XID_PRINT("Someone granted me the lock", result);
  }

  SpinRelease(masterLock);

  return(status == STATUS_OK);
}

/* ----------------------------
 * LockResolveConflicts -- test for lock conflicts
 *
 * NOTES:
 * 	Here's what makes this complicated: one transaction's
 * locks don't conflict with one another.  When many processes
 * hold locks, each has to subtract off the other's locks when
 * determining whether or not any new lock acquired conflicts with
 * the old ones.
 *
 *  For example, if I am already holding a WRITE_INTENT lock,
 *  there will not be a conflict with my own READ_LOCK.  If I
 *  don't consider the intent lock when checking for conflicts,
 *  I find no conflict.
 * ----------------------------
 */
LockResolveConflicts(ltable,lock,lockt,xid,pid)
LOCKTAB	*ltable;
LOCK	*lock;
LOCKT	lockt;
TransactionId	xid;
int pid;
{
  XIDLookupEnt	*result,item;
  int		*myHolders;
  int		nLockTypes;
  HTAB		*xidTable;
  Boolean	found;
  int		bitmask;
  int 		i,tmpMask;

  nLockTypes = ltable->ctl->nLockTypes;
  xidTable = ltable->xidHash;

  /* ---------------------
   * read my own statistics from the xid table.  If there
   * isn't an entry, then we'll just add one.
   *
   * Zero out the tag, this clears the padding bytes for long
   * word alignment and ensures hashing consistency.
   * ------------------
   */
  bzero(&item, XID_TAGSIZE);
  TransactionIdStore(xid, &item.tag.xid);
  item.tag.lock = MAKE_OFFSET(lock);
  item.tag.pid = pid;

  if (! (result = (XIDLookupEnt *)
	 hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found)))
  {
    elog(NOTICE,"LockResolveConflicts: xid table corrupted");
    return(STATUS_ERROR);
  }
  myHolders = result->holders;

  if (! found)
  {
    /* ---------------
     * we're not holding any type of lock yet.  Clear
     * the lock stats.
     * ---------------
     */
    bzero(result->holders,
	  nLockTypes*sizeof(* (lock->holders)));
    result->nHolding = 0;
  }

  /* ----------------------------
   * first check for global conflicts: If no locks conflict
   * with mine, then I get the lock.
   *
   * Checking for conflict: lock->mask represents the types of
   * currently held locks.  conflictTable[lockt] has a bit
   * set for each type of lock that conflicts with mine.  Bitwise
   * compare tells if there is a conflict.
   * ----------------------------
   */
  if (! (ltable->ctl->conflictTab[lockt] & lock->mask))
  {

    result->holders[lockt]++;
    result->nHolding++;

    XID_PRINT("Conflict Resolved: updated xid entry stats", result);

    return(STATUS_OK);
  }

  /* ------------------------
   * Rats.  Something conflicts. But it could still be my own
   * lock.  We have to construct a conflict mask
   * that does not reflect our own locks.
   * ------------------------
   */
  bitmask = 0;
  tmpMask = 2;
  for (i=1;i<=nLockTypes;i++, tmpMask <<= 1)
  {
    if (lock->activeHolders[i] - myHolders[i])
    {
      bitmask |= tmpMask;
    }
  }

  /* ------------------------
   * now check again for conflicts.  'bitmask' describes the types
   * of locks held by other processes.  If one of these
   * conflicts with the kind of lock that I want, there is a
   * conflict and I have to sleep.
   * ------------------------
   */
  if (! (ltable->ctl->conflictTab[lockt] & bitmask))
  {

    /* no conflict. Get the lock and go on */

    result->holders[lockt]++;
    result->nHolding++;

    XID_PRINT("Conflict Resolved: updated xid entry stats", result);

    return(STATUS_OK);

  }

  return(STATUS_FOUND);
}

int
WaitOnLock(ltable, tableId, lock,lockt)
LOCKTAB		*ltable;
LOCK 		*lock;
LockTableId		tableId;
LOCKT		lockt;
{
  PROC_QUEUE *waitQueue = &(lock->waitProcs);

  int prio = ltable->ctl->prio[lockt];

  /* the waitqueue is ordered by priority. I insert myself
   * according to the priority of the lock I am acquiring.
   *
   * SYNC NOTE: I am assuming that the lock table spinlock
   * is sufficient synchronization for this queue.  That
   * will not be true if/when people can be deleted from
   * the queue by a SIGINT or something.
   */
  LOCK_PRINT("WaitOnLock: sleeping on lock", (&lock->tag), lockt);
  if (ProcSleep(waitQueue,
		ltable->ctl->masterLock,
                lockt,
                prio,
                lock) != NO_ERROR)
  {
    /* -------------------
     * This could have happend as a result of a deadlock, see HandleDeadLock()
     * Decrement the lock nHolding and holders fields as we are no longer 
     * waiting on this lock.
     * -------------------
     */
    lock->nHolding--;
    lock->holders[lockt]--;
    SpinRelease(ltable->ctl->masterLock);
    elog(WARN,"WaitOnLock: error on wakeup - Aborting this transaction");

    /* -------------
     * There is no return from elog(WARN, ...)
     * -------------
     */
  }

  return(STATUS_OK);
}

/*
 * LockRelease -- look up 'lockName' in lock table 'tableId' and
 *	release it.
 *
 * Side Effects: if the lock no longer conflicts with the highest
 *	priority waiting process, that process is granted the lock
 *	and awoken. (We have to grant the lock here to avoid a
 *	race between the waking process and any new process to
 *	come along and request the lock).
 */
bool
LockRelease(tableId, lockName, lockt)
LockTableId	tableId;
LOCKTAG	*lockName;
LOCKT	lockt;
{
  LOCK		*lock = NULL;
  SPINLOCK 	masterLock;
  Boolean	found;
  LOCKTAB 	*ltable;
  int		status;
  XIDLookupEnt	*result,item;
  HTAB 		*xidTable;
  bool		wakeupNeeded = true;

  Assert (tableId < NumTables);
  ltable = AllTables[tableId];
  if (!ltable) {
    elog(NOTICE, "ltable is null in LockRelease");
    return (FALSE);
  }

  if (LockingIsDisabled)
  {
    return(TRUE);
  }

  LOCK_PRINT("Release",lockName,lockt);

  masterLock = ltable->ctl->masterLock;
  xidTable = ltable->xidHash;

  SpinAcquire(masterLock);

  Assert( ltable->lockHash->hash == tag_hash);
  lock = (LOCK *)
    hash_search(ltable->lockHash,(Pointer)lockName,HASH_FIND_SAVE,&found);

  /* let the caller print its own error message, too.
   * Do not elog(WARN).
   */
  if (! lock)
  {
    SpinRelease(masterLock);
    elog(NOTICE,"LockRelease: locktable corrupted");
    return(FALSE);
  }

  if (! found)
  {
    SpinRelease(masterLock);
    elog(NOTICE,"LockRelease: locktable lookup failed, no lock");
    return(FALSE);
  }

  Assert(lock->nHolding > 0);

  /*
   * fix the general lock stats
   */
  lock->nHolding--;
  lock->holders[lockt]--;
  lock->nActive--;
  lock->activeHolders[lockt]--;

  Assert(lock->nActive >= 0);

  if (! lock->nHolding)
  {
    /* ------------------
     * if there's no one waiting in the queue,
     * we just released the last lock.
     * Delete it from the lock table.
     * ------------------
     */
    Assert( ltable->lockHash->hash == tag_hash);
    lock = (LOCK *) hash_search(ltable->lockHash,
				(Pointer) &(lock->tag),
				HASH_REMOVE_SAVED,
				&found);
    Assert(lock && found);
    wakeupNeeded = false;
  }

  /* ------------------
   * Zero out all of the tag bytes (this clears the padding bytes for long
   * word alignment and ensures hashing consistency).
   * ------------------
   */
  bzero(&item, XID_TAGSIZE);

  TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
  item.tag.lock = MAKE_OFFSET(lock);
  item.tag.pid = MyPid;

  if (! ( result = (XIDLookupEnt *) hash_search(xidTable,
						(Pointer)&item,
						HASH_FIND_SAVE,
						&found) )
	 || !found)
  {
    SpinRelease(masterLock);
    elog(NOTICE,"LockReplace: xid table corrupted");
    return(FALSE);
  }
  /*
   * now check to see if I have any private locks.  If I do,
   * decrement the counts associated with them.
   */
  result->holders[lockt]--;
  result->nHolding--;

  XID_PRINT("LockRelease updated xid stats", result);

  /*
   * If this was my last hold on this lock, delete my entry
   * in the XID table.
   */
  if (! result->nHolding)
  {
    SHMQueueDelete(&result->queue);
    if (! (result = (XIDLookupEnt *)
	   hash_search(xidTable, (Pointer)&item, HASH_REMOVE_SAVED, &found)) ||
	! found)
    {
      SpinRelease(masterLock);
      elog(NOTICE,"LockReplace: xid table corrupted");
      return(FALSE);
    }
  }

  /* --------------------------
   * If there are still active locks of the type I just released, no one
   * should be woken up.  Whoever is asleep will still conflict
   * with the remaining locks.
   * --------------------------
   */
  if (! (lock->activeHolders[lockt]))
  {
    /* change the conflict mask.  No more of this lock type. */
    lock->mask &= BITS_OFF[lockt];
  }

  if (wakeupNeeded)
  {
    /* --------------------------
     * Wake the first waiting process and grant him the lock if it
     * doesn't conflict.  The woken process must record the lock
     * himself.
     * --------------------------
     */
    (void) ProcLockWakeup(&(lock->waitProcs), (char *) ltable, (char *) lock);
  }

  SpinRelease(masterLock);
  return(TRUE);
}

/*
 * GrantLock -- udpate the lock data structure to show
 *	the new lock holder.
 */
void
GrantLock(lock, lockt)
LOCK 	*lock;
LOCKT	lockt;
{
  lock->nActive++;
  lock->activeHolders[lockt]++;
  lock->mask |= BITS_ON[lockt];
}

bool
LockReleaseAll(tableId,lockQueue)
LockTableId		tableId;
SHM_QUEUE	*lockQueue;
{
  PROC_QUEUE 	*waitQueue;
  int		done;
  XIDLookupEnt	*xidLook = NULL;
  XIDLookupEnt	*tmp = NULL;
  SHMEM_OFFSET 	end = MAKE_OFFSET(lockQueue);
  SPINLOCK 	masterLock;
  LOCKTAB 	*ltable;
  int		i,nLockTypes;
  LOCK		*lock;
  Boolean	found;

  Assert (tableId < NumTables);
  ltable = AllTables[tableId];
  if (!ltable)
    return (FALSE);

  nLockTypes = ltable->ctl->nLockTypes;
  masterLock = ltable->ctl->masterLock;

  if (SHMQueueEmpty(lockQueue))
    return TRUE;

  SHMQueueFirst(lockQueue,&xidLook,&xidLook->queue);

  XID_PRINT("LockReleaseAll:", xidLook);

  SpinAcquire(masterLock);
  for (;;)
  {
    /* ---------------------------
     * XXX Here we assume the shared memory queue is circular and
     * that we know its internal structure.  Should have some sort of
     * macros to allow one to walk it.  mer 20 July 1991
     * ---------------------------
     */
    done = (xidLook->queue.next == end);
    lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);

    LOCK_PRINT("ReleaseAll",(&lock->tag),0);

    /* ------------------
     * fix the general lock stats
     * ------------------
     */
    if (lock->nHolding != xidLook->nHolding)
    {
      lock->nHolding -= xidLook->nHolding;
      lock->nActive -= xidLook->nHolding;
      Assert(lock->nActive >= 0);
      for (i=1; i<=nLockTypes; i++)
      {
	lock->holders[i] -= xidLook->holders[i];
	lock->activeHolders[i] -= xidLook->holders[i];
	if (! lock->activeHolders[i])
	  lock->mask &= BITS_OFF[i];
      }
    }
    else
    {
      /* --------------
       * set nHolding to zero so that we can garbage collect the lock
       * down below...
       * --------------
       */
      lock->nHolding = 0;
    }
    /* ----------------
     * always remove the xidLookup entry, we're done with it now
     * ----------------
     */
    if ((! hash_search(ltable->xidHash, (Pointer)xidLook, HASH_REMOVE, &found))
        || !found)
    {
      SpinRelease(masterLock);
      elog(NOTICE,"LockReplace: xid table corrupted");
      return(FALSE);
    }

    if (! lock->nHolding)
    {
      /* --------------------
       * if there's no one waiting in the queue, we've just released
       * the last lock.
       * --------------------
       */

      Assert( ltable->lockHash->hash == tag_hash);
      lock = (LOCK *)
	hash_search(ltable->lockHash,(Pointer)&(lock->tag),HASH_REMOVE, &found);
      if ((! lock) || (!found))
      {
	SpinRelease(masterLock);
	elog(NOTICE,"LockReplace: cannot remove lock from HTAB");
	return(FALSE);
      }
    }
    else
    {
      /* --------------------
       * Wake the first waiting process and grant him the lock if it
       * doesn't conflict.  The woken process must record the lock
       * him/herself.
       * --------------------
       */
      waitQueue = &(lock->waitProcs);
      (void) ProcLockWakeup(waitQueue, (char *) ltable, (char *) lock);
    }

    if (done)
      break;
    SHMQueueFirst(&xidLook->queue,&tmp,&tmp->queue);
    xidLook = tmp;
  }
  SpinRelease(masterLock);
  SHMQueueInit(lockQueue);
  return TRUE;
}

int
LockShmemSize()
{
	int size;
	int nLockBuckets, nLockSegs;
	int nXidBuckets, nXidSegs;

	nLockBuckets = 1 << (int)my_log2((NLOCKENTS - 1) / DEF_FFACTOR + 1);
	nLockSegs = 1 << (int)my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);

	nXidBuckets = 1 << (int)my_log2((NLOCKS_PER_XACT-1) / DEF_FFACTOR + 1);
	nXidSegs = 1 << (int)my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);

	size =	NLOCKENTS*sizeof(LOCK) +
		NBACKENDS*sizeof(XIDLookupEnt) +
		NBACKENDS*sizeof(PROC) +
		sizeof(LOCKCTL) +
		sizeof(PROC_HDR) +
		nLockSegs*DEF_SEGSIZE*sizeof(SEGMENT) +
		my_log2(NLOCKENTS) + sizeof(HHDR) +
		nXidSegs*DEF_SEGSIZE*sizeof(SEGMENT) +
		my_log2(NBACKENDS) + sizeof(HHDR);

	return size;
}

/* -----------------
 * Boolean function to determine current locking status
 * -----------------
 */
bool
LockingDisabled()
{
	return LockingIsDisabled;
}
@


1.22
log
@transaction ids are now longs, needed some casts (size change might
cause problems for lmgr routines and the hash tags
@
text
@d23 1
a23 1
 * $Header: /private/mer/pg.latest/src/storage/lmgr/RCS/lock.c,v 1.21 1992/03/30 00:17:00 mer Exp mer $
d33 31
d447 1
a487 4
    /* -----------------
     * nHolding tells how many have tried to acquire the lock.
     *------------------
     */
d489 1
d576 1
d580 2
d615 3
d793 2
d881 2
@


1.21
log
@take advantage of ability to save and remove hash state
@
text
@d23 1
a23 1
 * $Header: /users/mer/pg/src/storage/lmgr/RCS/lock.c,v 1.20 1992/02/25 21:39:14 mer Exp mer $
d404 1
a404 1
  TransactionIdStore(myXid, (char *) &item.tag.xid);
d511 1
a511 1
  TransactionIdStore(xid, (char *) &item.tag.xid);
d737 1
a737 1
  TransactionIdStore(GetCurrentTransactionId(), (char *) &item.tag.xid);
@


1.20
log
@make purify happy by initializing pointers
@
text
@d23 1
a23 1
 * $Header: /users/mer/pg/src/storage/lmgr/RCS/lock.c,v 1.19 1992/02/24 21:33:27 mer Exp mer $
d659 1
d682 1
a682 1
    hash_search(ltable->lockHash,(Pointer)lockName,HASH_FIND,&found);
d713 17
d741 5
a745 2
  if (! (result = (XIDLookupEnt *)
	 hash_search(xidTable, (Pointer)&item, HASH_FIND, &found))|| !found)
d766 1
a766 1
	   hash_search(xidTable, (Pointer)&item, HASH_REMOVE, &found)) ||
a774 22

  if (! lock->nHolding)
  {
    /* ------------------
     * if there's no one waiting in the queue,
     * we just released the last lock.
     * Delete it from the lock table.
     * ------------------
     */
    Assert( ltable->lockHash->hash == tag_hash);
    lock = (LOCK *)
      hash_search(ltable->lockHash,(Pointer)&(lock->tag),HASH_REMOVE, &found);
    SpinRelease(masterLock);

    if ((! lock) || (!found))
    {
      elog(NOTICE,"LockReplace: cannot remove lock from HTAB");
      return(FALSE);
    }
    return(TRUE);
  }

d787 10
a796 7
  /* --------------------------
   * Wake the first waiting process and grant him the lock if it
   * doesn't conflict.  The woken process must record the lock
   * himself.
   * --------------------------
   */
  (void) ProcLockWakeup(&(lock->waitProcs), (char *) ltable, (char *) lock);
@


1.19
log
@allocate enough space for subsequent sprintf's (10 no enough)
@
text
@d23 1
a23 1
 * $Header: /users/mer/pg/src/storage/lmgr/RCS/lock.c,v 1.18 1991/11/14 19:39:58 kemnitz Exp mer $
d821 2
a822 1
  XIDLookupEnt	*xidLook,*tmp;
@


1.18
log
@protos checkin.
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.17 91/11/08 15:45:30 kemnitz Exp Locker: kemnitz $
d172 1
a172 1
  shmemName = (char *) palloc((unsigned)(strlen(tabName)+10));
@


1.17
log
@fixed prototypes.
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.16 91/08/29 23:52:03 mer Exp Locker: kemnitz $
a53 5
/* -------------------
 * hash function from (hash_fn.c)
 * -------------------
 */
extern int tag_hash();
d404 1
a404 1
  TransactionIdStore(myXid, &item.tag.xid);
d448 1
a448 1
  status = LockResolveConflicts(ltable, lock, lockt, myXid, MyPid);
d511 1
a511 1
  TransactionIdStore(xid, &item.tag.xid);
d719 1
a719 1
  TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
d794 1
a794 1
  (void) ProcLockWakeup(&(lock->waitProcs), ltable, lock);
d827 1
a827 1
  int		found;
d921 1
a921 1
      (void) ProcLockWakeup(waitQueue, ltable, lock);
@


1.16
log
@purge old lmgr code
@
text
@d23 1
a23 1
 * $Header: /users/mer/postgres/src/storage/lmgr/RCS/lock.c,v 1.15 1991/08/26 18:02:25 mer Exp mer $
a41 2

typedef int MASK;
@


1.15
log
@changes to support parallel backends
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.14 91/08/14 22:05:19 mer Exp $
a24 1
#include "storage/lock.h"
d28 1
d33 7
a40 1

d84 1
d106 1
d304 1
a304 1
TableId
d306 1
a306 1
TableId	tableId;
d308 1
a308 1
  TableId	newTableId;
d337 1
d339 1
a339 1
TableId		tableId;
d597 1
d601 1
a601 1
TableId		tableId;
d653 1
d655 1
a655 1
TableId	tableId;
d671 1
a671 1
    return (NULL);
d697 1
a697 1
    return(NULL);
d704 1
a704 1
    return(NULL);
d811 1
d821 1
a821 23
#ifdef NOTUSED
LockCompare( l1, l2)
LOCKTAG	*l1,*l2;
{
  char 	*res1,*res2;
  int	c1,c2;
  int	i;

  if (l2->tupleId.positionData != (unsigned short) (-10)) return;

  res1 = (char *) l1;
  res2 = (char *) l2;
  for (i=0;i<12;i++)
  {
    c1 = res1[i];
    c2 = res2[i];

    printf( "(%d,%d)",c1,c2);
  }
  printf("\n");
}
#endif NOTUSED

d823 1
a823 1
TableId		tableId;
d845 1
a845 1
    return;
d938 1
@


1.14
log
@fix bug that occurs when after a deadlock, now fixes locks stats correctly
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.13 91/08/12 15:16:24 mer Exp Locker: mer $
d33 2
a36 3
static
TransactionIdData InvalidXid;	/* Since no such beast is defined in xact.c */

a82 6
  /* -------------------
   * set up the invalid transaction id.
   * -------------------
   */
  PointerStoreInvalidTransactionId(&InvalidXid);

d404 1
a404 1
  item.tag.backendId = MyBackendId;
d446 1
a446 1
  status = LockResolveConflicts(ltable, lock, lockt, myXid, MyBackendId);
d482 1
a482 1
LockResolveConflicts(ltable,lock,lockt,xid,backendId)
d487 1
a487 1
int backendId;
d511 1
a511 1
  item.tag.backendId = backendId;
d717 1
a717 1
  item.tag.backendId = MyBackendId;
@


1.13
log
@multi-user fixes and functionality additions
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.12 91/08/03 00:59:52 mer Exp $
d452 1
d454 1
d622 2
d626 2
a745 1
    LOCK_PRINT("Deleting xid",(&lock->tag),lockt);
a765 1
    LOCK_PRINT("Release: deleting lock", (&lock->tag), lockt);
@


1.12
log
@bug fixes and extend locks
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.11 91/08/01 14:55:18 mer Exp Locker: mer $
a376 1
  myXid = GetCurrentTransactionId();
d401 2
d426 9
d442 1
a442 1
  if (result->nHolding == lock->nHolding)
d451 2
a452 2
  Assert(result->nHolding <= lock->nHolding);
  status = LockResolveConflicts(ltable, lock, lockt, myXid);
a462 2
    lock->nHolding++;
    lock->holders[lockt]++;
a484 5
 *
 * The current scheme is this.  Each lock has an (xid) field containing
 * the xid of the current owner.  As soon as there are multiple owners,
 * the xid field is changed to "ProcTxIdInvalid" to indicate that
 * it belongs to no one person.
d487 1
a487 1
LockResolveConflicts(ltable,lock,lockt,xid)
d492 1
d516 1
a516 1
  item.tag.backendId = MyBackendId;
d566 1
a566 1
    if (lock->holders[i] - myHolders[i])
d611 1
d659 2
a660 1
  if (!ltable)
d662 1
d669 2
d715 1
a716 2
  Assert (! TransactionIdEquals(&item.tag.xid, &InvalidXid));

d740 1
a740 1
    LOCK_PRINT("Deleting now",(&lock->tag),lockt);
d761 1
d776 1
a776 1
   * If there are still locks of the type I just released, no one
d781 1
a781 1
  if (! (lock->holders[lockt]))
a806 2
  lock->nHolding++;
  lock->holders[lockt]++;
d812 1
a812 129
/*
 * LockReplace -- remove a lock from a lock table.
 *
 * This module can be called from outside hence 'tableId' param
 * instead of 'ltable'.
 *
 * NOTE: that we are holding the lock already.  We have looked it
 *	up already by the time we get here.
 */
LockReplace(tableId, lock, lockt)
TableId	tableId;
LOCK 	*lock;
LOCKT 	lockt;
{
  PROC_QUEUE 	*waitQueue;
  XIDLookupEnt	*result,item;
  SPINLOCK 	masterLock;
  Boolean	found;
  LOCKTAB 	*ltable;
  HTAB 		*xidTable;

  Assert (tableId < NumTables);
  ltable = AllTables[tableId];
  if (!ltable)
    return (FALSE);

  LOCK_PRINT("Release",(&lock->tag),lockt);
  masterLock = ltable->ctl->masterLock;
  xidTable = ltable->xidHash;
  SpinAcquire(masterLock);
  /*
   * fix the general lock stats
   */
  lock->nHolding--;
  lock->holders[lockt]--;
  lock->nActive--;
  lock->activeHolders[lockt]--;

  Assert(lock->nActive >= 0);

  /* ------------------
   * Zero out all of the tag bytes (this clears the padding bytes for long
   * word alignment and ensures hashing consistency).
   * ------------------
   */
  bzero(&item, XID_TAGSIZE);
  TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
  Assert (! TransactionIdEquals(&item.tag.xid, &InvalidXid));

  item.tag.lock = MAKE_OFFSET(lock);
  item.tag.backendId = MyBackendId;

  if (! (result = (XIDLookupEnt *)
	 hash_search(xidTable, (Pointer)&item, HASH_FIND, &found))|| !found)
  {
    SpinRelease(masterLock);
    elog(NOTICE,"LockReplace: xid table corrupted");
    return(FALSE);
  }
  /*
   * now check to see if I have any private locks.  If I do,
   * decrement the counts associated with them.
   */
  result->holders[lockt]--;
  result->nHolding--;

  /*
   * If this was my last hold on this lock, delete my entry
   * in the XID table.
   */
  if (! result->nHolding)
  {
    LOCK_PRINT("Deleting now",(&lock->tag),lockt);
    SHMQueueDelete(&result->queue);
    if (! (result = (XIDLookupEnt *)
	   hash_search(xidTable, (Pointer)&item, HASH_REMOVE, &found)) ||
	! found)
    {
      SpinRelease(masterLock);
      elog(NOTICE,"LockReplace: xid table corrupted");
      return(FALSE);
    }
  }


  if (! lock->nHolding)
  {
    /* if there's no one waiting in the queue,
     * we just released the last lock.
     * Delete it from the lock table.
     */

    Assert( ltable->lockHash->hash == tag_hash);
    lock = (LOCK *)
      hash_search(ltable->lockHash,(Pointer)&(lock->tag),HASH_REMOVE, &found);
    SpinRelease(masterLock);

    if ((! lock) || (!found))
    {
      elog(NOTICE,"LockReplace: cannot remove lock from HTAB");
      return(FALSE);
    }
    return(TRUE);
  }

  /*
   * If there are still locks of the type I just released, no one
   * should be woken up.  Whoever is asleep will still conflict
   * with the remaining locks.
   */
  if (! (lock->holders[lockt]))
  {
    /* change the conflict mask.  No more of this lock type. */
    lock->mask &= BITS_OFF[lockt];
  }

  /*
   * Wake the first waiting process and grant him the lock if it
   * doesn't conflict.  The woken process must record the lock
   * himself.
   */
  waitQueue = &(lock->waitProcs);
  (void) ProcLockWakeup(waitQueue, ltable, lock);

  SpinRelease(masterLock);
  return(TRUE);
}


d833 1
@


1.11
log
@add backendId and dbId to hash table tags
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.10 91/07/29 16:52:13 mer Exp Locker: mer $
a424 7
  /* -----------------
   * nHolding tells how many have tried to acquire the lock.
   *------------------
   */
  lock->nHolding++;
  lock->holders[lockt]++;

d432 1
a432 1
  if (result->nHolding == (lock->nHolding - 1))
d441 1
a441 1
  Assert(result->nHolding < lock->nHolding);
d448 7
d456 1
d559 1
a559 1
  tmpMask = 1;
d798 2
a799 6
/*  This is now done before we actually attempt to get the lock in
 *  in LockAcquire()
 *
 *  lock->nHolding++;
 *  lock->holders[lockt]++;
 */
d1005 1
a1005 1
      for (i=0;i<nLockTypes;i++)
@


1.10
log
@clean up globals, clean up hash table interactons
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.9 91/07/29 07:19:44 mer Exp Locker: mer $
d410 1
d511 2
d709 2
d858 2
d1029 2
a1030 2
    if ((! hash_search(ltable->xidHash, (Pointer)xidLook, HASH_REMOVE, &found)) ||
	!found)
@


1.9
log
@add deadlock detection and move structs to lock.h
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.8 91/07/23 20:30:30 mer Exp Locker: mer $
d49 1
a49 1
extern bool LockingIsDisabled;
d369 1
a369 1
  lock = (LOCK *)hash_search(ltable->lockHash,(Pointer)lockName,ENTER,&found);
d411 1
a411 1
  result = (XIDLookupEnt *)hash_search(xidTable, (Pointer)&item, ENTER, &found);
d511 1
a511 1
	 hash_search(xidTable, (Pointer)&item, ENTER, &found)))
d665 1
a665 1
    hash_search(ltable->lockHash,(Pointer)lockName,FIND,&found);
d707 1
a707 1
	 hash_search(xidTable, (Pointer)&item, FIND, &found))|| !found)
d729 1
a729 1
	   hash_search(xidTable, (Pointer)&item, REMOVE, &found)) ||
d749 1
a749 1
      hash_search(ltable->lockHash,(Pointer)&(lock->tag),REMOVE, &found);
d854 1
a854 1
	 hash_search(xidTable, (Pointer)&item, FIND, &found))|| !found)
d876 1
a876 1
	   hash_search(xidTable, (Pointer)&item, REMOVE, &found)) ||
d895 1
a895 1
      hash_search(ltable->lockHash,(Pointer)&(lock->tag),REMOVE, &found);
d1022 1
a1022 1
    if ((! hash_search(ltable->xidHash, (Pointer)xidLook, REMOVE, &found)) ||
d1040 1
a1040 1
	hash_search(ltable->lockHash,(Pointer)&(lock->tag),REMOVE, &found);
d1093 10
@


1.8
log
@phantom bug is now real, this should fix it once and for all.
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.7 91/07/23 20:07:27 mer Exp Locker: mer $
d25 1
a26 1
#include "storage/proc.h"
a27 1
#include "storage/lock.h"
a37 3
static
int LockingDisabled = FALSE;

d45 3
a47 19
/* This is the control structure for a lock table.  It
 * lives in shared memory:
 *
 * tableID -- the handle used by the lock table's clients to
 *	refer to the table.
 *
 * nLockTypes -- number of lock types (READ,WRITE,etc) that
 *	are defined on this lock table
 *
 * conflictTab -- this is an array of bitmasks showing lock
 *	type conflicts. conflictTab[i] is a mask with the j-th bit
 *	turned on if lock types i and j conflict.
 *
 * prio -- each locktype has a priority, so, for example, waiting
 *	writers can be given priority over readers (to avoid
 *	starvation).
 *
 * masterlock -- synchronizes access to the table
 *
d49 1
a49 7
typedef struct lockctl {
  TableId	tableId;
  int		nLockTypes;
  int		conflictTab[MAX_LOCKTYPES];
  int		prio[MAX_LOCKTYPES];
  SPINLOCK	masterLock;
} LOCKCTL;
a50 83
/*
 * lockHash -- hash table on lock Ids,
 * xidHash -- hash on xid and lockId in case
 *	multiple processes are holding the lock
 * ctl - control structure described above.
 */
typedef struct ltable {
  HTAB		*lockHash;
  HTAB		*xidHash;
  LOCKCTL	*ctl;
} LOCKTAB;

/* -----------------------
 * A transaction never conflicts with its own locks.  Hence, if
 * multiple transactions hold non-conflicting locks on the same
 * data, private per-transaction information must be stored in the
 * XID table.  The tag is XID + shared memory lock address so that
 * all locks can use the same XID table.  The private information
 * we store is the number of locks of each type (holders) and the
 * total number of locks (nHolding) held by the transaction.
 *
 * NOTE: --
 * There were some problems with the fact that currently TransactionIdData
 * is a 5 byte entity and compilers long word aligning of structure fields.
 * If the 3 byte padding is put in front of the actual xid data then the
 * hash function (which uses XID_TAGSIZE when deciding how many bytes of a
 * struct to look at for the key) might only see the last two bytes of the xid.
 *
 * Clearly this is not good since its likely that these bytes will be the
 * same for many transactions and hence they will share the same entry in
 * hash table causing the entry to be corrupted.  For this long-winded
 * reason I have put the tag in a struct of its own to ensure that the
 * XID_TAGSIZE is computed correctly.  It used to be sizeof (SHMEM_OFFSET) +
 * sizeof(TransactionIdData) which != sizeof(XIDTAG).
 *
 * Finally since the hash function will now look at all 12 bytes of the tag
 * the padding bytes MUST be zero'd before use in hash_search() as they
 * will have random values otherwise.  Jeff 22 July 1991.
 * -----------------------
 */

typedef struct XIDTAG {
  SHMEM_OFFSET                lock;
  TransactionIdData           xid;
} XIDTAG;

typedef struct XIDLookupEnt {
  /* tag */
  XIDTAG tag;

  /* data */
  int			holders[MAX_LOCKTYPES];
  int			nHolding;
  SHM_QUEUE		queue;

} XIDLookupEnt;

#define XID_TAGSIZE (sizeof(XIDTAG))

/*
 * lock information:
 *
 * tag -- uniquely identifies the object being locked
 * mask -- union of the conflict masks of all lock types
 *	currently held on this object.
 * waitProcs -- queue of processes waiting for this lock
 * holders -- count of each lock type currently held on the
 *	lock.
 * nHolding -- total locks of all types.
 */
typedef struct Lock {
  /* hash key */
  LOCKTAG		tag;

  /* data */
  int			mask;
  PROC_QUEUE		waitProcs;
  int			holders[MAX_LOCKTYPES];
  int			nHolding;
  int			activeHolders[MAX_LOCKTYPES];
  int			nActive;
} LOCK;

d103 1
a103 1
 * LockDisable -- sets LockingDisabled flag to TRUE or FALSE.
d109 1
a109 1
  LockingDisabled = status;
d358 1
a358 1
  if (LockingDisabled)
d363 1
a363 1
  LOCK_PRINT("Acquire",lockName);
d447 1
d603 5
a607 1
  if (ProcSleep(waitQueue, ltable->ctl->masterLock, lockt, prio)!= NO_ERROR)
d609 3
a611 3
    /* ----------------
     * we could handle timeouts here
     * ----------------
d613 7
a619 2
    elog(NOTICE,"WaitOnLock: wakeup with error");
    return(STATUS_ERROR);
d653 1
a653 1
  if (LockingDisabled)
d726 1
a726 1
    LOCK_PRINT("Deleting now",(&lock->tag));
d829 1
a829 1
  LOCK_PRINT("Release",(&lock->tag));
d873 1
a873 1
    LOCK_PRINT("Deleting now",(&lock->tag));
d990 1
a990 1
    LOCK_PRINT("ReleaseAll",(&lock->tag));
d1053 1
a1053 1
       * himself.
@


1.7
log
@back out last "fix" things were right the first time
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.6 91/07/23 19:29:39 mer Exp Locker: mer $
d197 1
a197 1
  TransactionIdSetTransactionIdValue(&InvalidXid, NullTransactionIdValue);
@


1.6
log
@bug fix and computation of shmem size.
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.5 91/07/23 13:15:32 kemnitz Exp Locker: mer $
d197 1
a197 1
  TransactionIdSetTransactionIdValue(NullTransactionIdValue, &InvalidXid);
@


1.5
log
@fixed CAST complaint from sun 3 compiler.
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.4 91/07/23 10:03:32 kemnitz Exp $
d28 2
a30 1
#include "storage/lock.h"
a145 2
 * xid -- identifier of the (process) currently holding the
 *	lock or INVALID if many processed hold locks.
d197 1
a197 1
  TransactionIdSetTransactionIdValue(&InvalidXid, NullTransactionIdValue);
d1172 2
d1175 2
a1176 1
	size = 30*1024;		/* 30 K */
d1178 13
a1191 9
/* ------------------------------------
 * Dont forget to add in 2*(size needed for 2 hash tables)
 *  size =  	NLOCK_ENTS*sizeof(LOCK) +
 *		NLOCKS_PER_XACT*sizeof(XIDLookupEnt) +
 *		NBACKENDS*sizeof(PROC) +
 *		sizeof(LOCKCTL) +
 *		sizeof(PROC_HDR);
 * ------------------------------------
 */
@


1.4
log
@fixed cast that caused Sun compiler to barf.
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.3 91/07/22 17:34:33 mer Exp Locker: kemnitz $
d1037 1
a1037 1
  if (l2->tupleId.positionData != -10) return;
@


1.3
log
@eliminate noisy printf
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.2 91/07/22 14:22:57 mer Exp Locker: mer $
d1037 1
a1037 2
  if (l2->tupleId.positionData != (-10))
    return;
@


1.2
log
@incremental check in (not complete yet)
@
text
@d23 1
a23 1
 * $Header: RCS/lock.c,v 1.1 91/07/15 10:41:13 mer Exp Locker: mer $
a1150 1
      printf("lock %x: WAKEUP, nholding %d\n",lock,lock->nHolding);
@


1.1
log
@Initial revision
@
text
@d4 1
a4 1
 *  Outside modules can create a lock table and acquire/release 
d8 1
a8 1
 *  in proc.c.
d18 2
a19 2
 *  NOTE: This module is used to define new lock tables.  The 
 *	multi-level lock table (multi.c) used by the heap 
d23 1
a23 1
 * $Header$
d25 7
a31 6
#include "def.h"
#include "sem.h"
#include "shmem.h"
#include "proc.h"
#include "hsearch.h"
#include "lock.h"
d33 8
a40 2
private
int LockingDisabled = TRUE;
d44 1
d64 1
a64 1
 * 
d73 1
a73 1
  SPINLOCK	*masterLock;
d88 1
a88 1
/*
d96 19
d116 6
d124 1
a124 2
  XID		xid;
  SHMEM_OFFSET	lock;
d127 3
a129 3
  int		holders[MAX_LOCKTYPES];
  int		nHolding;
  SHM_QUEUE	queue;
a131 1
#define XID_TAGSIZE (sizeof(XID)+sizeof(SHMEM_OFFSET))
d133 2
d146 1
a146 1
 *	lock or INVALID if many processed hold locks.  
d148 1
a148 1
typedef struct LOCK {
d150 1
a150 1
  LOCKTAG	tag;
d153 6
a158 7
  int		mask;
  PROC_QUEUE	waitProcs;
  int		holders[MAX_LOCKTYPES];
  int		nHolding;
  int		activeHolders[MAX_LOCKTYPES];
  int		nActive;
  XID		xid;
d161 3
a163 5
#ifdef undefined

/*
 * I used this for testing.  It shows how the interface works
 *
a164 29
#define READ_LOCK	2
#define WRITE_LOCK	1

int Prios[] = {
  NULL,
  2,
  1
};

int Conflicts[] = {
  NULL,
  (1 << READ_LOCK) | (1 << WRITE_LOCK),
  (1 << WRITE_LOCK),
};


SingleTabInit()
{
  int tableId;

  tableId = LockTabInit("LockTable",Conflicts,Prios,2);
  if (! tableId) {
    elog(WARN,"couldnt initialize single-level LT");
  }
 return(tableId);
}
#endif

/* hash function from (hash_fn.c) */
d166 4
a169 1
/* from shmem.c */
d172 4
a175 1
/* map from tableId to the lock table structure */
d177 5
a181 1
/* no zero-th table */
d184 1
a184 1
/*
d187 1
a187 1
 *
d194 6
d201 6
a206 2
  /* remember 0th locktype is invalid */
  for (i=0;i<MAX_LOCKTYPES;i++,bit <<= 1) {
d212 1
a212 1
/*
d214 1
d229 1
a229 1
private
d241 2
a242 1
  for (i=0;i<ntypes;i++,prioP++,conflictsP++) {
d251 1
a251 1
 * Notes: 
d253 1
a253 1
 *	table.  This is because every shared hash table and spinlock 
d258 1
a264 1
  SPINLOCK *tmpLock;
d272 2
a273 1
  if (ntypes > MAX_LOCKTYPES) {
d277 1
a277 1
  } 
d279 2
a280 1
  if (NumTables > MAX_TABLES) {
d285 1
a285 1
  } 
d289 2
a290 1
  if (! shmemName) {
d297 2
a298 1
  if (! ltable) {
d304 5
a308 8
  /* find/acquire the spinlock for the table */
  sprintf(shmemName,"%s (spin)",tabName);
  tmpLock = SpinAlloc(shmemName);
  if (! tmpLock) {
    elog(WARN,"LockTabInit: couldn't create master lock %s",tabName);
    (void) pfree (shmemName);
    return(INVALID_TABLEID);
  }
d311 5
a315 1
  /* allocate a control structure from shared memory */
d319 3
a321 1
  if (! ltable->ctl) {
d326 6
a331 1
  if (! found) {
d333 1
a333 1
    ltable->ctl->masterLock = tmpLock;
d337 4
a340 1
  /* other modules refer to the lock table by a tableId */
d343 1
a343 1
  assert(NumTables <= MAX_TABLES);
d345 4
a348 2
  /* allocate a hash table for the lock tags.  This is used
   * to find the different locks. 
d350 1
a350 1
  info.keysize =  sizeof(LOCKTAG); 
d360 3
a362 2
  assert( ltable->lockHash->hash == tag_hash);
  if (! ltable->lockHash) {
d367 1
a367 1
  /*
d370 1
d382 2
a383 1
  if (! ltable->xidHash) {
a389 1
  SpinRelease(tmpLock);
d391 2
d397 1
a397 1
  else 
d403 1
a403 1
 *	lock table.  
d419 2
a420 1
  if (NumTables >= MAX_TABLES) {
d423 2
a424 1
  if (AllTables[tableId] == INVALID_TABLEID) {
a445 1
TRUSTED
d455 1
a455 1
  SPINLOCK 	*masterLock;
d458 1
a458 1
  XID		myXid;
d460 1
a460 1
  assert (tableId < NumTables);
d462 2
a463 1
  if (!ltable) {
d468 2
a469 1
  if (LockingDisabled) {
d475 1
a476 3
  assert( ltable->lockHash->hash == tag_hash);
  lock = (LOCK *)
    hash_search(ltable->lockHash,(Address)lockName,ENTER,&found);
d478 5
a482 1
  if (! lock) {
d487 1
a487 1
  myXid = TxStateXID();
d489 6
a494 2
  /* if there was nothing else there, complete initialization */
  if  (! found) {
a500 1
    assert (lock->tag.tupleId.blockNum == lockName->tupleId.blockNum);
d502 1
a502 4
    /* give the lock my xid.  If I created the thing, I own it. */
    XIDAssign(lock->xid,myXid);
    assert (! XIDEquals(lock->xid,XIDInvalid));
  } 
d504 4
a507 2
  /* 
   * add an element to the lock queue so that we can clear the 
d509 1
a510 1
#ifndef LCHAIN
d512 12
a523 4
  XIDAssign(item.xid,myXid);
  item.lock = MAKE_OFFSET(lock);
  if (! (result = (XIDLookupEnt *)
	 hash_search(xidTable, (Address)&item, ENTER, &found))) {
d526 3
a528 2
  } 
  if (!found) {
d532 1
a532 2
  } 
#endif
d534 8
a541 1
  /* 
d543 4
a546 1
   * cannot be a conflict.
d548 2
a549 2
  if (XIDEquals(lock->xid,myXid)) {

a553 3
#ifdef LCHAIN
    LkChainStore(tableId, (int *)lock, lockt);
#endif
d558 2
a559 1
  if (status == STATUS_OK) {
d561 3
a563 2
  } else if (status == STATUS_FOUND) 
    status = LockWait(ltable, tableId, lock, lockt);
a566 3
#ifdef LCHAIN
  LkChainStore(tableId, (int *)lock, lockt);
#endif
d570 1
a570 1
/*
d578 1
a578 1
 * the old ones.  
d581 1
a581 1
 *  there will not be a conflict with my own READ_LOCK.  If I 
d589 1
a589 1
 *
d595 1
a595 1
XID	xid;
d608 1
a608 39
  /*
   * First check to see if someone else owns the lock.
   * If one other process owns it, that process' XID 
   * is in the xid field.  If many other processes have acquired
   * the lock,XIDInvalid is in the lock->xid field.
   */
  if (! XIDEquals(lock->xid, XIDInvalid)) {

    /* someone else owns it.  or used to.  Now that I'm
     * here, we are going to change the lock ownership to
     * "many" (ProcTxIdInvalid).  Before we do that, store
     * the current lock stats in a new XID table entry
     * for the current process so he can find them if he
     * tries for another lock.  
     */
    XIDAssign(item.xid,lock->xid);
    item.lock = MAKE_OFFSET(lock);
    if (! (result = (XIDLookupEnt *) 
	   hash_search(xidTable, (Address)&item, ENTER, &found))) {
      elog(NOTICE,"LockAcquire: xid table corrupted");
      return(STATUS_ERROR);
    } 

    /* ignore the 'found' flag.  Even if there is already an
     * entry, it may be out of date.  (There shouldn't be
     * an entry, now that I think about it).
     */
    bcopy(lock->holders,result->holders,
	  nLockTypes*sizeof(* (lock->holders)));
    result->nHolding = lock->nHolding;

    /* 
     * now lock has multiple owners. Mark xid so that the next
     * person to acquire the lock realizes this.
     */
    XIDAssign(lock->xid, XIDInvalid);
  } 

  /*
d611 4
d616 3
a618 2
  XIDAssign(item.xid,xid);
  item.lock = MAKE_OFFSET(lock);
d620 2
a621 1
	 hash_search(xidTable, (Address)&item, ENTER, &found))) {
d624 1
a624 1
  } 
d627 4
a630 2
  if (! found) {
    /* we're not holding any type of lock yet.  Clear
d632 1
d639 1
a639 1
  /*
d641 1
a641 1
   * with mine, then I get the lock.  
d643 1
a643 1
   * Checking for conflict: lock->mask represents the types of 
d647 1
d649 2
a650 2
  if (! (ltable->ctl->conflictTab[lockt] & lock->mask)) {

d657 2
a658 1
  /* Rats.  Something conflicts. But it could still be my own
d661 1
d665 4
a668 2
  for (i=1;i<=nLockTypes;i++, tmpMask <<= 1) {
    if (lock->holders[i] - myHolders[i]) {
d673 4
a676 3
  /* now check again for conflicts.  'bitmask' describes the types
   * of locks held by other processes.  If one of these 
   * conflicts with the kind of lock that I want, there is a 
d678 1
d680 2
a681 1
  if (! (ltable->ctl->conflictTab[lockt] & bitmask)) {
d689 1
a689 1
  } 
d694 1
a694 1
LockWait(ltable, tableId, lock,lockt)
d709 1
a709 1
   * will not be true if/when people can be deleted from 
d712 7
a718 4
  if (ProcSleep(waitQueue, ltable->ctl->masterLock, lockt, prio)!= NO_ERROR) {

    /* we could handle timeouts here */
    elog(NOTICE,"LockWait: wakeup with error");
d727 1
a727 1
 *	release it.  
a734 1
TRUSTED
d741 1
a741 1
  SPINLOCK 	*masterLock;
d748 1
a748 1
  assert (tableId < NumTables);
d753 2
a754 1
  if (LockingDisabled) {
d760 1
d762 2
a763 1
  assert( ltable->lockHash->hash == tag_hash);
d765 1
a765 1
    hash_search(ltable->lockHash,(Address)lockName,FIND,&found);
d770 2
a771 1
  if (! lock) {
d777 2
a778 1
  if (! found) {
d784 1
a784 1
  assert(lock->nHolding > 0);
d794 1
a794 2
  XIDAssign(item.xid,TxStateXID());
  assert (! XIDEquals(item.xid,XIDInvalid));
d796 10
a805 1
  item.lock = MAKE_OFFSET(lock);
d807 2
a808 1
	 hash_search(xidTable, (Address)&item, FIND, &found))|| !found) {
d812 1
a812 1
  } 
d814 1
a814 1
   * now check to see if I have any private locks.  If I do, 
d820 1
a820 1
  /* 
d824 2
a825 1
  if (! result->nHolding) {
a826 1
#ifndef LCHAIN
a827 1
#endif
d829 3
a831 2
	   hash_search(xidTable, (Address)&item, REMOVE, &found)) || 
	! found) {
d835 1
a835 1
    } 
d839 5
a843 3
  if (! lock->nHolding) {
    /* if there's no one waiting in the queue, 
     * we just released the last lock.  
d845 1
d847 1
a847 2

    assert( ltable->lockHash->hash == tag_hash);
d849 1
a849 1
      hash_search(ltable->lockHash,(Address)&(lock->tag),REMOVE, &found);
d852 2
a853 1
    if ((! lock) || (!found))  {
d860 1
a860 1
  /*
d864 1
d866 2
a867 1
  if (! (lock->holders[lockt])) {
d872 1
a872 1
  /*
d876 1
d879 1
a879 1
  
a880 4
#ifdef LCHAIN
  if (status == TRUE)
    LkChainRemove(tableId, lock);
#endif
d892 6
a897 2
  lock->nHolding++;
  lock->holders[lockt]++;
d919 1
a919 1
  SPINLOCK 	*masterLock;
d924 1
a924 1
  assert (tableId < NumTables);
d941 1
a941 2
  XIDAssign(item.xid,TxStateXID());
  assert (! XIDEquals(item.xid,XIDInvalid));
d943 10
a952 1
  item.lock = MAKE_OFFSET(lock);
d954 2
a955 1
	 hash_search(xidTable, (Address)&item, FIND, &found))|| !found) {
d959 1
a959 1
  } 
d961 1
a961 1
   * now check to see if I have any private locks.  If I do, 
d967 1
a967 1
  /* 
d971 2
a972 1
  if (! result->nHolding) {
a973 1
#ifndef LCHAIN
a974 1
#endif
d976 3
a978 2
	   hash_search(xidTable, (Address)&item, REMOVE, &found)) || 
	! found) {
d982 1
a982 1
    } 
d986 4
a989 3
  if (! lock->nHolding) {
    /* if there's no one waiting in the queue, 
     * we just released the last lock.  
d993 1
a993 1
    assert( ltable->lockHash->hash == tag_hash);
d995 1
a995 1
      hash_search(ltable->lockHash,(Address)&(lock->tag),REMOVE, &found);
d998 2
a999 1
    if ((! lock) || (!found))  {
d1011 2
a1012 1
  if (! (lock->holders[lockt])) {
d1024 1
a1024 1
  
d1037 1
a1037 1
  if (l2->tupleId.offset != (-10))
d1042 2
a1043 1
  for (i=0;i<12;i++) {
a1051 1
TRUSTED
d1060 1
a1060 1
  SPINLOCK 	*masterLock;
d1066 1
a1066 1
  assert (tableId < NumTables);
d1080 10
a1089 3
  for (;;) {
    done = (xidLook->queue.prev == end);
    lock = (LOCK *) MAKE_PTR(xidLook->lock);
d1093 1
a1093 1
    /*
d1095 1
d1097 2
a1098 1
    if (XIDEquals(lock->xid,XIDInvalid)) {
d1101 3
a1103 1
      for (i=0;i<nLockTypes;i++) {
d1109 8
a1116 2
    } else {
      /* set nHolding to zero so that we can garbage collect the lock*/
d1119 7
a1125 3
    /* always remove the xidLookup entry, we're done with it now */
    if ((! hash_search(ltable->xidHash, (Address)xidLook, REMOVE, &found)) || 
	! found) {
d1129 1
a1129 1
    } 
d1131 3
a1133 2
    if (! lock->nHolding) {
      /*
d1135 2
a1136 1
       * the last lock. 
d1139 1
a1139 1
      assert( ltable->lockHash->hash == tag_hash);
d1141 3
a1143 2
	hash_search(ltable->lockHash,(Address)&(lock->tag),REMOVE, &found);
      if ((! lock) || (!found))  {
d1148 3
a1150 1
    } else {
d1152 1
a1152 1
      /*
d1156 1
d1171 2
a1172 2
#ifndef LCHAIN
InitLkChain()
d1174 14
a1188 2
#endif

@
