head	1.47;
access;
symbols
	release_4_2:1.47
	marc_alpha:1.42
	aix_ok:1.42;
locks; strict;
comment	@ * @;


1.47
date	94.06.25.00.43.46;	author aoki;	state Exp;
branches;
next	1.46;

1.46
date	94.06.16.03.23.06;	author aoki;	state Exp;
branches;
next	1.45;

1.45
date	94.02.07.11.44.51;	author aoki;	state Exp;
branches;
next	1.44;

1.44
date	94.02.02.00.22.27;	author marc;	state Exp;
branches;
next	1.43;

1.43
date	94.02.01.11.55.34;	author aoki;	state Exp;
branches;
next	1.42;

1.42
date	93.08.10.01.46.51;	author marc;	state Exp;
branches;
next	1.41;

1.41
date	93.07.09.22.27.48;	author andrew;	state Exp;
branches;
next	1.40;

1.40
date	93.06.01.16.19.36;	author mao;	state Exp;
branches;
next	1.39;

1.39
date	93.04.07.09.02.17;	author mao;	state Exp;
branches;
next	1.38;

1.38
date	93.03.10.05.54.49;	author mao;	state Exp;
branches;
next	1.37;

1.37
date	93.03.08.18.56.20;	author mao;	state Exp;
branches;
next	1.36;

1.36
date	92.09.05.23.41.58;	author mao;	state Exp;
branches;
next	1.35;

1.35
date	92.06.30.22.31.43;	author mao;	state Exp;
branches;
next	1.34;

1.34
date	92.06.11.17.49.25;	author mao;	state Exp;
branches;
next	1.33;

1.33
date	92.05.28.17.09.35;	author mao;	state Exp;
branches;
next	1.32;

1.32
date	92.01.29.21.32.50;	author mao;	state Exp;
branches;
next	1.31;

1.31
date	91.11.14.19.40.44;	author kemnitz;	state Exp;
branches;
next	1.30;

1.30
date	91.11.08.20.18.35;	author mao;	state Exp;
branches;
next	1.29;

1.29
date	91.11.07.06.05.53;	author mao;	state Exp;
branches;
next	1.28;

1.28
date	91.10.29.06.34.27;	author mao;	state Exp;
branches;
next	1.27;

1.27
date	91.10.29.06.33.22;	author mao;	state Exp;
branches;
next	1.26;

1.26
date	91.10.29.04.12.35;	author mao;	state Exp;
branches;
next	1.25;

1.25
date	91.10.29.00.11.52;	author mao;	state Exp;
branches;
next	1.24;

1.24
date	91.10.04.17.52.59;	author mao;	state Exp;
branches;
next	1.23;

1.23
date	91.10.03.15.07.32;	author mao;	state Exp;
branches;
next	1.22;

1.22
date	91.10.03.00.56.55;	author mao;	state Exp;
branches;
next	1.21;

1.21
date	91.09.28.20.04.03;	author mao;	state Exp;
branches;
next	1.20;

1.20
date	91.09.11.07.19.37;	author mao;	state Exp;
branches;
next	1.19;

1.19
date	91.09.10.23.27.19;	author mao;	state Exp;
branches;
next	1.18;

1.18
date	91.09.10.06.41.50;	author mao;	state Exp;
branches;
next	1.17;

1.17
date	91.09.09.23.58.55;	author mao;	state Exp;
branches;
next	1.16;

1.16
date	91.09.05.23.26.02;	author hong;	state Exp;
branches;
next	1.15;

1.15
date	91.08.22.06.33.09;	author mao;	state Exp;
branches;
next	1.14;

1.14
date	91.08.13.22.00.30;	author mao;	state Exp;
branches;
next	1.13;

1.13
date	91.08.08.05.53.28;	author mao;	state Exp;
branches;
next	1.12;

1.12
date	91.08.06.08.09.21;	author mao;	state Exp;
branches;
next	1.11;

1.11
date	91.08.06.01.41.44;	author mao;	state Exp;
branches;
next	1.10;

1.10
date	91.08.03.00.29.18;	author mao;	state Exp;
branches;
next	1.9;

1.9
date	91.07.29.16.52.28;	author mer;	state Exp;
branches;
next	1.8;

1.8
date	91.07.26.00.52.21;	author mao;	state Exp;
branches;
next	1.7;

1.7
date	91.07.24.23.37.03;	author mao;	state Exp;
branches;
next	1.6;

1.6
date	91.07.24.07.47.24;	author mao;	state Exp;
branches;
next	1.5;

1.5
date	91.07.22.22.21.11;	author mao;	state Exp;
branches;
next	1.4;

1.4
date	91.07.22.08.00.36;	author mao;	state Exp;
branches;
next	1.3;

1.3
date	91.07.22.05.32.38;	author mao;	state Exp;
branches;
next	1.2;

1.2
date	91.07.21.23.13.32;	author mao;	state Exp;
branches;
next	1.1;

1.1
date	91.07.09.00.12.09;	author mao;	state Exp;
branches;
next	;


desc
@sony jukebox storage manager
@


1.47
log
@someone did bzero instead of S_INIT_LOCK
@
text
@/*
 *  sj.c -- sony jukebox storage manager.
 *
 *	This code manages relations that reside on the sony write-once
 *	optical disk jukebox.
 */

#include "tmp/c.h"
#include "tmp/postgres.h"

#ifdef SONY_JUKEBOX

#include <sys/file.h>
#include <math.h>
#include "machine.h"

#include "tmp/miscadmin.h"

#include "storage/fd.h"
#include "storage/ipci.h"
#include "storage/ipc.h"
#include "storage/smgr.h"
#include "storage/shmem.h"
#include "storage/spin.h"

#include "utils/hsearch.h"
#include "utils/rel.h"
#include "utils/log.h"
#include "utils/memutils.h"

#include "access/htup.h"
#include "access/relscan.h"
#include "access/heapam.h"

#include "catalog/pg_platter.h"
#include "catalog/pg_plmap.h"
#include "catalog/pg_proc.h"

#include "storage/sj.h"

RcsId("$Header: sj.c,v 1.46 94/06/16 03:23:06 aoki Exp $");

/* globals used in this file */
SPINLOCK		SJCacheLock;	/* lock for cache metadata */
extern ObjectId		MyDatabaseId;	/* OID of database we have open */
extern Name		MyDatabaseName;	/* name of database we have open */

static File		SJCacheVfd;	/* vfd for cache data file */
static File		SJMetaVfd;	/* vfd for cache metadata file */
static File		SJBlockVfd;	/* vfd for nblocks file */
static SJCacheHeader	*SJHeader;	/* pointer to cache header in shmem */
static HTAB		*SJCacheHT;	/* pointer to hash table in shmem */
static SJCacheItem	*SJCache;	/* pointer to cache metadata in shmem */
static SJCacheTag	*SJNBlockCache;	/* pointer to nblock cache */

#ifndef	HAS_TEST_AND_SET

/*
 *  If we don't have test-and-set locks, then we need a semaphore for
 *  concurrency control.  This semaphore is in addition to the metadata
 *  lock, SJCacheLock, that we acquire before touching the cache metadata.
 *
 *  This semaphore is used in two ways.  During cache initialization, we
 *  use it to lock out all other backends that want cache access.  During
 *  normal processing, we control access to groups on which IO is in
 *  progress by holding this lock.  When we're done with initialization or
 *  IO, we do enough V's on the semaphore to satisfy all outstanding P's.
 */

static IpcSemaphoreId	SJWaitSemId;	/* wait semaphore */
static long		*SJNWaiting;	/* # procs sleeping on the wait sem */

#endif /* ndef HAS_TEST_AND_SET */

/* static buffer is for data transfer */
static char	SJCacheBuf[SJBUFSIZE];

/*
 *  When we have to do IO on a group, we avoid holding an exclusive lock on
 *  the cache metadata for the duration of the operation.  We do this by
 *  setting a finer-granularity lock on the group itself.  How we do this
 *  depends on whether we have test-and-set locks or not.  If so, it's
 *  easy; we set the TASlock on the item itself.  Otherwise, we use the
 *  'wait' semaphore described above.
 */

#ifdef HAS_TEST_AND_SET
#define SET_IO_LOCK(item) \
    item->sjc_gflags |= SJC_IOINPROG; \
    SpinRelease(SJCacheLock); \
    S_LOCK(&(item->sjc_iolock));
#else /* HAS_TEST_AND_SET */
#define SET_IO_LOCK(item) \
    item->sjc_gflags |= SJC_IOINPROG; \
    (*SJNWaiting)++; \
    SpinRelease(SJCacheLock); \
    IpcSemaphoreLock(SJWaitSemId, 0, 1);
#endif /* HAS_TEST_AND_SET */

#define GROUPNO(item)	(((char *) item) - ((char *) &(SJCache[0])))/sizeof(SJCacheItem)

/* globals defined elsewhere */
extern char		*DataDir;

/* routines declared in this file */
static void		_sjcacheinit();
static void		_sjwait_init();
static void		_sjunwait_init();
static void		_sjwait_io();
static void		_sjunwait_io();
static void		_sjtouch();
static void		_sjunpin();
static void		_sjregister();
static void		_sjregnblocks();
static void		_sjnewextent();
static void		_sjrdextent();
static void		_sjdirtylast();
static int		_sjfindnblocks();
static int		_sjwritegrp();
static int		_sjreadgrp();
static int		_sjgroupvrfy();
static int		_sjdowrite();
static Form_pg_plmap	_sjchoose();
static SJCacheItem	*_sjallocgrp();
static SJCacheItem	*_sjfetchgrp();
static SJHashEntry	*_sjhashop();
static int		_sjgetgrp();
static void		_sjdump();

/* routines declared elsewhere */
extern HTAB		*ShmemInitHash();
extern long		*ShmemInitStruct();
extern Relation		RelationIdGetRelation();
extern BlockNumber	pgjb_offset();
extern bool		pgjb_freespc();

/*
 *  sjinit() -- initialize the Sony jukebox storage manager.
 *
 *	We need to find (or establish) the mag-disk buffer cache metadata
 *	in shared memory and open the cache on mag disk.  The first backend
 *	to run that touches the cache initializes it.  All other backends
 *	running simultaneously will only wait for this initialization to
 *	complete if they need to get data out of the cache.  Otherwise,
 *	they'll return successfully immediately after attaching the cache
 *	memory, and will let their older sibling do all the work.
 */

int
sjinit()
{
    unsigned int metasize;
    bool metafound;
    HASHCTL info;
    bool initcache;
    char *cacheblk, *cachesave;
    int status;
    char *path;

    /*
     *  First attach the shared memory block that contains the disk
     *  cache metadata.  At the end of this block in shared memory is
     *  the hash table we use to do fast lookup on groups in the cache.
     */

    SpinAcquire(SJCacheLock);

#ifdef HAS_TEST_AND_SET
    metasize =
	MAXALIGN(SJCACHESIZE * sizeof(SJCacheItem)) +
	    MAXALIGN(sizeof(SJCacheHeader)) +
		MAXALIGN(SJNBLKSIZE * sizeof(SJCacheTag));
#else /* HAS_TEST_AND_SET */
    metasize =
	MAXALIGN(SJCACHESIZE * sizeof(SJCacheItem)) +
	    MAXALIGN(sizeof(SJCacheHeader)) +
		MAXALIGN(SJNBLKSIZE * sizeof(SJCacheTag)) +
		    MAXALIGN(sizeof(*SJNWaiting));
#endif /* HAS_TEST_AND_SET */
    cachesave = cacheblk = (char *) ShmemInitStruct("Jukebox cache metadata",
						    metasize, &metafound);

    if (cacheblk == (char *) NULL) {
	SpinRelease(SJCacheLock);
	return (SM_FAIL);
    }

    /*
     *  Order of items in shared memory is metadata header, number of
     *  processes sleeping on the wait semaphore (if no test-and-set locks),
     *  nblock cache, and jukebox cache entries.
     */

    SJHeader = (SJCacheHeader *) cacheblk;
    cacheblk += sizeof(SJCacheHeader);

#ifndef HAS_TEST_AND_SET
    SJNWaiting = (long *) cacheblk;
    cacheblk += sizeof(long);
#endif /* ndef HAS_TEST_AND_SET */

    SJNBlockCache = (SJCacheTag *) cacheblk;
    cacheblk += SJNBLKSIZE * sizeof(SJCacheTag);

    SJCache = (SJCacheItem *) cacheblk;

    /*
     *  Now initialize the pointer to the shared memory hash table.
     */

    info.keysize = sizeof(SJCacheTag);
    info.datasize = sizeof(int);
    info.hash = tag_hash;

    SJCacheHT = ShmemInitHash("Jukebox cache hash table",
			      SJCACHESIZE, SJCACHESIZE,
			      &info, (HASH_ELEM|HASH_FUNCTION));

    if (SJCacheHT == (HTAB *) NULL) {
	SpinRelease(SJCacheLock);
	return (SM_FAIL);
    }

    /*
     *  Okay, all our shared memory pointers are set up.  If we did not
     *  find the cache metadata entries in shared memory, or if the cache
     *  has not been initialized from disk, initialize it in this backend.
     */

    if (!metafound || !(SJHeader->sjh_flags & (SJH_INITING|SJH_INITED))) {
	initcache = true;
	bzero((char *) cachesave, metasize);
	SJHeader->sjh_flags = SJH_INITING;
#ifdef HAS_TEST_AND_SET
	S_INIT_LOCK(&(SJHeader->sjh_initlock));
	S_LOCK(&(SJHeader->sjh_initlock));
#else /* HAS_TEST_AND_SET */
	IpcSemaphoreLock(SJWaitSemId, 0, 1);
	*SJNWaiting = 1;
#endif /* HAS_TEST_AND_SET */
    } else {
	initcache = false;
    }

    /* don't need exclusive access anymore */
    SpinRelease(SJCacheLock);

    path = (char *) palloc(strlen(DataDir) + strlen(SJCACHENAME) + 2);
    sprintf(path, "%s/%s", DataDir, SJCACHENAME);

    SJCacheVfd = PathNameOpenFile(path, O_RDWR|O_CREAT|O_EXCL, 0600);
    if (SJCacheVfd < 0) {
	SJCacheVfd = PathNameOpenFile(path, O_RDWR, 0600);
	if (SJCacheVfd < 0) {

	    /* if we were initializing the metadata, note our surrender */
	    if (!metafound) {
		SJHeader->sjh_flags &= ~SJH_INITING;
		_sjunwait_init();
	    }

	    return (SM_FAIL);
	}
    }

    pfree(path);

    path = (char *) palloc(strlen(DataDir) + strlen(SJMETANAME) + 2);
    sprintf(path, "%s/%s", DataDir, SJMETANAME);

    SJMetaVfd = PathNameOpenFile(path, O_RDWR|O_CREAT|O_EXCL, 0600);
    if (SJMetaVfd < 0) {
	SJMetaVfd = PathNameOpenFile(path, O_RDWR, 0600);
	if (SJMetaVfd < 0) {

	    /* if we were initializing the metadata, note our surrender */
	    if (!metafound) {
		SJHeader->sjh_flags &= ~SJH_INITING;
		_sjunwait_init();
	    }

	    return (SM_FAIL);
	}
    }

    pfree(path);

    path = (char *) palloc(strlen(DataDir) + strlen(SJBLOCKNAME) + 2);
    sprintf(path, "%s/%s", DataDir, SJBLOCKNAME);

    SJBlockVfd = PathNameOpenFile(path, O_RDWR|O_CREAT|O_EXCL, 0600);
    if (SJBlockVfd < 0) {
	SJBlockVfd = PathNameOpenFile(path, O_RDWR, 0600);
	if (SJBlockVfd < 0) {

	    /* if we were initializing the metadata, note our surrender */
	    if (!metafound) {
		SJHeader->sjh_flags &= ~SJH_INITING;
		_sjunwait_init();
	    }

	    return (SM_FAIL);
	}
    }

    pfree(path);

    /*
     *  If it's our responsibility to initialize the shared-memory cache
     *  metadata, then go do that.  Sjcacheinit() will elog(FATAL, ...) if
     *  it can't initialize the cache, so we don't need to worry about a
     *  return value here.
     */

    if (initcache) {
	_sjcacheinit();
    }

    /*
     *  Finally, we need to initialize the data structures we use for
     *  communicating with the jukebox.
     */

    if (pgjb_init() == SM_FAIL)
	return (SM_FAIL);

    return (SM_SUCCESS);
}

static void
_sjcacheinit()
{
    int nbytes, nread;
    int nentries;
    int nblocks;
    int start, stop;
    int i;
    SJCacheItem *cur;
    SJHashEntry *result;
    bool found;

    /* sanity check */
    if ((SJHeader->sjh_flags & SJH_INITED)
	|| !(SJHeader->sjh_flags & SJH_INITING)) {
	elog(FATAL, "sj cache header metadata corrupted.");
    }

    /* suck in the metadata */
    if (FileSeek(SJMetaVfd, 0L, SEEK_SET) != 0)
	elog(FATAL, "cannot seek on jukebox metadata cache");

    nbytes = SJCACHESIZE * sizeof(SJCacheItem);
    bzero((char *) SJCache, nbytes);
    nread = FileRead(SJMetaVfd, (char *) SJCache, nbytes);

    /* be sure we got an integral number of entries */
    nentries = nread / sizeof(SJCacheItem);
    if ((nentries * sizeof(SJCacheItem)) != nread) {
	SJHeader->sjh_flags &= ~SJH_INITING;
	_sjunwait_init();
	elog(FATAL, "sj cache metadata file corrupted.");
    }

    /*
     *  Clear out the nblock cache
     */
    bzero((char *) SJNBlockCache, SJNBLKSIZE * sizeof(SJCacheTag));

    /* add every group that appears in the cache to the hash table */
    for (i = 0; i < nentries; i++) {
	cur = &(SJCache[i]);
	result = _sjhashop(&(cur->sjc_tag), HASH_ENTER, &found);

	/* store the group number for this key in the hash table */
	result->sjhe_groupno = i;

	/* no io in progress */
	cur->sjc_gflags &= ~SJC_IOINPROG;
	cur->sjc_refcount = 0;

#ifdef HAS_TEST_AND_SET
	S_UNLOCK(&(cur->sjc_iolock));
#endif HAS_TEST_AND_SET
    }

    /*
     *  Now construct the LRU list (free list).  Extents will be nominated
     *  for reuse in this order.  Since we have no usage information, we
     *  adopt the following policy:  any extents not yet allocated in the
     *  cache are come first in the list, in order.  These are followed by
     *  the allocated extents, in order.  The free list head is the first
     *  unallocated extent, and its tail is the last allocated one.  This
     *  list is doubly-linked and is not circular.
     *
     *  The bizarre loop, and the way that start and stop are set, are
     *  to allow me to do this in a single loop.  Trust me, this is simpler
     *  than the six special cases I used to have here.
     */

    start = nentries % SJCACHESIZE;
    stop = ((nentries - 1) + SJCACHESIZE) % SJCACHESIZE;

    i = start;
    for (;;) {
	cur = &SJCache[i];
	cur->sjc_freeprev = ((i - 1) + SJCACHESIZE) % SJCACHESIZE;
	cur->sjc_freenext = ((i + 1) + SJCACHESIZE) % SJCACHESIZE;
	i = (i + 1) % SJCACHESIZE;
	if (i == start)
	    break;
    }

    /* fix up the list's endpoints */
    SJCache[start].sjc_freeprev = -1;
    SJCache[stop].sjc_freenext = -1;

    /* set up cache metadata header struct */
    SJHeader->sjh_freehead = start;
    SJHeader->sjh_freetail = stop;
    SJHeader->sjh_nentries = 0;
    SJHeader->sjh_flags = SJH_INITED;
}

/*
 *  _sjunwait_init() -- Release initialization lock on the jukebox cache.
 *
 *	When we initialize the cache, we don't keep the cache semaphore
 *	locked.  Instead, we set a flag in the metadata to let other
 *	backends know that we're doing the initialization.  This lets
 *	others start running queries immediately, even if the cache is
 *	not yet populated.  If they want to look something up in the
 *	cache, they'll block on the flag we set, and wait for us to finish.
 *	If they don't need the jukebox, they can run unimpeded.  When we
 *	finish, we call _sjunwait_init() to release the initialization lock
 *	that we hold during initialization.
 *
 *	When we do this, either the cache is properly initialized, or
 *	we detected some error we couldn't deal with.  In either case,
 *	we no longer need exclusive access to the cache metadata.
 */

static void
_sjunwait_init()
{
#ifdef HAS_TEST_AND_SET

    S_UNLOCK(&(SJHeader->sjh_initlock));

#else /* HAS_TEST_AND_SET */

    /* atomically V the semaphore once for every waiting process */
    SpinAcquire(SJCacheLock);
    IpcSemaphoreUnlock(SJWaitSemId, 0, *SJNWaiting);
    *SJNWaiting = 0;
    SpinRelease(SJCacheLock);

#endif /* HAS_TEST_AND_SET */
}

/*
 *  _sjunwait_io() -- Release IO lock on the jukebox cache.
 *
 *	While we're doing IO on a particular group in the cache, any other
 *	process that wants to touch that group needs to wait until we're
 *	finished.  If we have TASlocks, then a wait lock appears on the
 *	group entry in the cache metadata.  Otherwise, we use the wait
 *	semaphore in the same way as for initialization, above.
 */

static void
_sjunwait_io(item)
    SJCacheItem *item;
{
    item->sjc_gflags &= ~SJC_IOINPROG;

#ifdef HAS_TEST_AND_SET
    S_UNLOCK(&(item->sjc_iolock));
#else /* HAS_TEST_AND_SET */

    /* atomically V the wait semaphore once for each sleeping process */
    SpinAcquire(SJCacheLock);

    if (*SJNWaiting > 0) {
	IpcSemaphoreUnlock(SJWaitSemId, 0, *SJNWaiting);
	*SJNWaiting = 0;
    }

    SpinRelease(SJCacheLock);
#endif /* HAS_TEST_AND_SET */
}

/*
 *  _sjwait_init() -- Wait for cache initialization to complete.
 *
 *	This routine is called when we want to access jukebox cache metadata,
 *	but someone else is initializing it.  When we return, the init lock
 *	has been released and we can retry our access.  On entry, we must be
 *	holding the cache metadata lock.
 */

static void
_sjwait_init()
{
#ifdef HAS_TEST_AND_SET
    SpinRelease(SJCacheLock);
    S_LOCK(&(SJHeader->sjh_initlock));
    S_UNLOCK(&(SJHeader->sjh_initlock));
#else /* HAS_TEST_AND_SET */
    (*SJNWaiting)++;
    SpinRelease(SJCacheLock);
    IpcSemaphoreLock(SJWaitSemId, 0, 1);
#endif /* HAS_TEST_AND_SET */
}

/*
 *  _sjwait_io() -- Wait for group IO to complete.
 *
 *	This routine is called when we discover that some other process is
 *	doing IO on a group in the cache that we want to use.  We need to
 *	wait for that IO to complete before we can use the group.  On entry,
 *	we must hold the cache metadata lock.  On return, we don't hold that
 *	lock, and the IO completed.  We can retry our access.
 */

static void
_sjwait_io(item)
    SJCacheItem *item;
{
#ifdef HAS_TEST_AND_SET
    SpinRelease(SJCacheLock);
    S_LOCK(&(item->sjc_iolock));
    S_UNLOCK(&(item->sjc_iolock));
#else /* HAS_TEST_AND_SET */
    (*SJNWaiting)++;
    SpinRelease(SJCacheLock);
    IpcSemaphoreLock(SJWaitSemId, 0, 1);
#endif /* HAS_TEST_AND_SET */
}

/*
 *  sjshutdown() -- shut down the jukebox storage manager.
 *
 *	We want to close the cache and metadata files, release all our open
 *	jukebox connections, and let the caller know we're done.
 */

int
sjshutdown()
{
    FileClose(SJCacheVfd);
    FileClose(SJMetaVfd);

    return (SM_SUCCESS);
}

/*
 *  sjcreate() -- Create the requested relation on the jukebox.
 *
 *	Creating a new relation requires us to make a new cache group,
 *	fill in the descriptor page, make sure everything is on disk,
 *	and create the new relation file to store the last page of data
 *	on magnetic disk.
 */

int
sjcreate(reln)
    Relation reln;
{
    SJCacheItem *item;
    SJGroupDesc *group;
    SJCacheTag tag;
    ObjectId dbid;
    ObjectId relid;
    File vfd;
    int grpno;
    int i;
    char *path;

    /*
     *  If the cache is in the process of being initialized, then we need
     *  to wait for initialization to complete.  If the cache is not yet
     *  initialized, and no one else is doing it, then we need to initialize
     *  it ourselves.  Sjwait_init() or sj_init() will release the cache
     *  lock for us.
     */

    SpinAcquire(SJCacheLock);
    if (!(SJHeader->sjh_flags & SJH_INITED)) {
	if (SJHeader->sjh_flags & SJH_INITING) {
	    _sjwait_init();
	} else {
	    sjinit();
	}
	return (sjcreate(reln));
    }
    SpinRelease(SJCacheLock);

    /*
     *  By here, cache is initialized.  We are aggressively lazy, and
     *  will not allocate an initial extent for this relation until it's
     *  actually used.  We just register an initial block count of zero.
     */

    if (reln->rd_rel->relisshared)
	tag.sjct_dbid = (ObjectId) 0;
    else
	tag.sjct_dbid = MyDatabaseId;

    tag.sjct_relid = reln->rd_id;
    tag.sjct_base = (BlockNumber) 0;

    _sjregnblocks(&tag);

    /* last thing to do is to create the mag-disk file to hold last page */
    if (reln->rd_rel->relisshared) {
	path = (char *) palloc(strlen(DataDir) + NAMEDATALEN + 2);
	sprintf(path, "%s/%.*s", DataDir, NAMEDATALEN,
		&(reln->rd_rel->relname.data[0]));
    } else {
	path = (char *) palloc(NAMEDATALEN + 1);
	sprintf(path, "%.*s", NAMEDATALEN, &(reln->rd_rel->relname.data[0]));
    }

    vfd = FileNameOpenFile(path, O_CREAT|O_RDWR|O_EXCL, 0600);

    pfree(path);

    return (vfd);
}

/*
 *  _sjregister() -- Make catalog entry for a new extent
 *
 *	When we create a new jukebox relation, or when we add a new extent
 *	to an existing relation, we need to make the appropriate entry in
 *	pg_plmap().  This routine does that.
 *
 *	On entry, we have item pinned; on exit, it's still pinned, and the
 *	system catalogs have been updated to reflect the presence of the
 *	new extent.
 */

static void
_sjregister(item, group)
    SJCacheItem *item;
    SJGroupDesc *group;
{
    Relation plmap;
    ObjectId plid;
    Form_pg_plmap plmdata;
    HeapTuple plmtup;

    /*
     *  Choose a platter to put the new extent on.  This returns a filled-in
     *  pg_plmap tuple data part to insert into the system catalogs.  The
     *  choose routine also figures out where to place the extent on the
     *  platter.
     *
     *	Sjchoose() palloc's and fills in plmdata; we free it later in this
     *  routine.
     */

    plmdata = _sjchoose(item);

    /* record plid, offset, extent size for caller */
    group->sjgd_plid = plmdata->plid;
    group->sjgd_jboffset = plmdata->ploffset;
    group->sjgd_extentsz = plmdata->plextentsz;

    plmtup = (HeapTuple) heap_addheader(Natts_pg_plmap,
					sizeof(FormData_pg_plmap),
					(char *) plmdata);

    /* clean up the memory that heap_addheader() palloc()'ed for us */
    plmtup->t_oid = InvalidObjectId;
    bzero((char *) &(plmtup->t_chain), sizeof(plmtup->t_chain));

    /* open the relation and lock it */
    plmap = heap_openr(Name_pg_plmap);
    RelationSetLockForWrite(plmap);

    /* insert the new catalog tuple */
    heap_insert(plmap, plmtup, (double *) NULL);

    /* done */
    heap_close(plmap);

    /* be tidy */
    pfree((char *) plmtup);
    pfree((char *) plmdata);
}

/*
 *  _sjchoose() -- Choose a platter to receive a new extent.
 *
 *	Allocation strategy is:
 *
 *	  + For the first extent of a new relation, put it on the first
 *	    with room for a new relation.  The policy for allocating new
 *	    relations to a platter is implemented by pgjb_freespc().
 *
 *	  + For second and subsequent extents of an existing relation:
 *
 *	    -  If there's a platter holding another extent for this
 *	       relation, and that platter has room for this extent,
 *	       allocate it there.  NOTE:  this is true in the current
 *	       implementation, but it's a side effect of the way in which
 *	       we scan for free space on platters (we consider platters
 *	       in the same order every time we look).
 *
 *	    -  Otherwise, allocate the extent on the first platter with
 *	       space for a new extent.
 */

static Form_pg_plmap
_sjchoose(item)
    SJCacheItem *item;
{
    Relation plat;
    TupleDescriptor platdesc;
    HeapScanDesc platscan;
    HeapTuple plattup;
    Buffer buf;
    Form_pg_plmap plmdata;
    ObjectId plid;
    Datum d;
    Name platname;
    char *plname;
    bool isnull;
    bool done;
    int alloctype;

    /* allocate the tuple form */
    plmdata = (Form_pg_plmap) palloc(sizeof(FormData_pg_plmap));
    plname = (char *) palloc(sizeof(NameData) + 1);

    plat = heap_openr(Name_pg_platter);

    /*
     *  We do short-term (non-two-phase) locking on the platter relation
     *  in order to guarantee serial allocations.
     */

    RelationSetLockForWrite(plat);

    platdesc = RelationGetTupleDescriptor(plat);
    platscan = heap_beginscan(plat, false, NowTimeQual, 0, NULL);

    /* figure out if this is a new or an old relation allocation */
    alloctype = (item->sjc_tag.sjct_base > 0 ? SJOLDRELN : SJNEWRELN);

    /* find a qualifying tuple in pg_platter */
    plattup = heap_getnext(platscan, false, &buf);
    if (!HeapTupleIsValid(plattup))
	elog(WARN, "_sjchoose: no platters in pg_plmap");

    done = false;
    do {
	/* get platter OID, name */
	plid = plmdata->plid = plattup->t_oid;
	d = (Datum) heap_getattr(plattup, buf, Anum_pg_platter_plname,
				 platdesc, &isnull);
	platname = DatumGetName(d);
	strncpy(plname, &(platname->data[0]), sizeof(NameData));
	plname[sizeof(NameData)] = '\0';

	done = pgjb_freespc(plname, plid, alloctype);

	/* done with this tuple */
	ReleaseBuffer(buf);

	/* next tuple */
	if (!done) {
	    plattup = heap_getnext(platscan, false, &buf);
	    if (!HeapTupleIsValid(plattup))
		elog(WARN, "_sjchoose: no space on platters in pg_plmap");
	}
    } while (!done);

    /* init the rest of the fields */
    plmdata->pldbid = item->sjc_tag.sjct_dbid;
    plmdata->plrelid = item->sjc_tag.sjct_relid;
    plmdata->plblkno = item->sjc_tag.sjct_base;
    plmdata->plextentsz = SJEXTENTSZ;
    plmdata->ploffset = pgjb_offset(plname, plmdata->plid, plmdata->plextentsz);

    /* no longer need an exclusive lock for the allocation */
    RelationUnsetLockForWrite(plat);

    heap_endscan(platscan);
    heap_close(plat);

    /* save platter name, id, offset in item */
    bcopy(plname, &(item->sjc_plname.data[0]), sizeof(NameData));
    item->sjc_plid = plmdata->plid;
    item->sjc_jboffset = plmdata->ploffset;

    return (plmdata);
}

/*
 *  _sjallocgrp() -- Allocate a new group in the cache for use by some
 *		    relation.
 *
 *	If there are any unused slots in the cache, we just return one
 *	of those.  Otherwise, we need to kick out the least-recently-used
 *	group and make room for another.
 *
 *	On entry, we hold the cache metadata lock.  On exit, we still hold
 *	it.  In between, we may release it in order to do I/O on the cache
 *	group we're kicking out, if we have to do that.
 */

static SJCacheItem *
_sjallocgrp(grpno)
    int *grpno;
{
    SJCacheItem *item;

    /* free list had better not be empty */
    if (SJHeader->sjh_nentries == SJCACHESIZE)
	elog(FATAL, "_sjallocgrp:  no groups on free list!");

    /*
     *  Get a new group off the free list.  As a side effect, _sjgetgrp()
     *  bumps the ref count on the group for us.
     */

    *grpno = _sjgetgrp();

    item = &SJCache[*grpno];

    return (item);
}

/*
 *  _sjgetgrp() -- Get a group off the free list.
 *
 *	This routine returns the least-recently used group on the free list
 *	to the caller.  If necessary, the (old) contents of the group are
 *	forced to the platter.  On entry, we hold the cache metadata lock.
 *	We release it and mark IOINPROG on the group if we need to do any
 *	io.  We reacquire the lock before returning.
 *
 *	We know that there's something on the free list when we call this
 *	routine.
 *
 *	There's an interesting problem with write-once media that we have
 *	to deal with here.  It is possible in postgres for a half-full
 *	buffer to be flushed to stable storage, then to be reloaded into
 *	the buffer cache, filled completely, and for a new page to be
 *	allocated before the old page is flushed again.  If this happens
 *	to us, it's possible for the half-full page to get flushed all the
 *	way through to an optical disk platter, where it can never be
 *	overwritten.
 *
 *	In order to deal with this, we probe the buffer manager for all
 *	dirty blocks it has that live on an extent before we flush the
 *	extent to permanent storage.
 */

static int
_sjgetgrp()
{
    SJCacheItem *item;
    int grpno;
    int where;
    long loc;
    bool found;
    int grpoffset;
    BlockNumber nblocks;
    Relation reln;
    bool dirty;
    int i;
    int offset;
    ObjectId dbid;
    ObjectId relid;
    BlockNumber base;

    /* pull the least-recently-used group off the free list */
    grpno = SJHeader->sjh_freehead;
    item = &(SJCache[grpno]);
    _sjtouch(item);

    /* if it was previously a valid group, remove it from the hash table */
    if (item->sjc_oid != InvalidObjectId)
	_sjhashop(&(item->sjc_tag), HASH_REMOVE, &found);

    /*
     *  See if we need to flush the group to the jukebox.  If we're working
     *  with an entirely new item (the corresponding cache slot is empty),
     *  dbid == relid == base == 0, so we can ignore the flags.  Otherwise,
     *  we check every flags entry in the group descriptor to see if anyone
     *  wants to get flushed.
     */

    dirty = false;
    if (item->sjc_tag.sjct_dbid != 0 || item->sjc_tag.sjct_relid != 0) {
	if (MUST_FLUSH(item->sjc_gflags)) {
	    dirty = true;
	} else {
	    for (i = 0; i < SJGRPSIZE; i++) {
		if (MUST_FLUSH(item->sjc_flags[i])) {
		    dirty = true;
		    break;
		}
	    }
	}
    }

    if (!dirty)
	return (grpno);

    /*
     *  By here, we need to force the group to stable storage outside the
     *  cache.  Mark IOINPROG on the group (in fact, this shouldn't matter,
     *  since no one should be able to get at it -- we just got it off the
     *  free list and removed its hash table entry), release our exclusive
     *  lock, and write it out.
     */

    SET_IO_LOCK(item);

    if (_sjreadgrp(item, grpno) == SM_FAIL) {
	_sjunwait_io(item);
	elog(FATAL, "_sjgetgrp:  cannot read group %d", grpno);
    }

    /*
     *  Probe the buffer manager for dirty blocks that belong in this
     *  extent.  The buffer manager will copy them into the space we
     *  pass in, and will mark them clean in the buffer cache.
     */

    dbid = item->sjc_tag.sjct_dbid;
    relid = item->sjc_tag.sjct_relid;
    base = item->sjc_tag.sjct_base;

    for (i = 0; i < SJGRPSIZE; i++) {
	if (MUST_FLUSH(item->sjc_flags[i])) {
	    offset = (i * BLCKSZ) + JBBLOCKSZ;
	    DirtyBufferCopy(dbid, relid, base + i, &(SJCacheBuf[offset]));
	}
    }

    nblocks = _sjfindnblocks(&(item->sjc_tag));

    if (pgjb_wrtextent(item, nblocks, &(SJCacheBuf[0])) == SM_FAIL) {
	_sjunwait_io(item);
	elog(FATAL, "_sjfree:  cannot free group.");
    }

    _sjunwait_io(item);

    /* give us back our exclusive lock */
    SpinAcquire(SJCacheLock);

    return (grpno);
}

static SJCacheItem *
_sjfetchgrp(dbid, relid, blkno, grpno)
    ObjectId dbid;
    ObjectId relid;
    int blkno;
    int *grpno;
{
    SJCacheItem *item;
    SJHashEntry *entry;
    bool found;
    SJCacheTag tag;

    SpinAcquire(SJCacheLock);

    tag.sjct_dbid = dbid;
    tag.sjct_relid = relid;
    tag.sjct_base = blkno;

    entry = _sjhashop(&tag, HASH_FIND, &found);

    if (found) {
	*grpno = entry->sjhe_groupno;
	item = &(SJCache[*grpno]);

	if (item->sjc_gflags & SJC_IOINPROG) {
	    _sjwait_io(item);
	    return (_sjfetchgrp(dbid, relid, blkno, grpno));
	}

	_sjtouch(item);

	SpinRelease(SJCacheLock);
    } else {
	item = _sjallocgrp(grpno);

	/*
	 *  Possible race condition:  someone else instantiated the extent
	 *  we want while we were off allocating a group for it.  If that
	 *  happened, we want to put our just-allocated group back on the
	 *  free list for someone else to use.
	 */

	entry = _sjhashop(&tag, HASH_FIND, &found);
	if (found) {
	    /*
	     *  Put the just-allocated group back on the free list.  This
	     *  requires us to reenter it into the hash table if it refers
	     *  to actual data.  We only want to do this if we got a different
	     *  free group from the other process.
	     */

	    if (entry->sjhe_groupno != *grpno) {
		if (item->sjc_oid != InvalidObjectId)
		    (void) _sjhashop(&(item->sjc_tag), HASH_ENTER, &found);
		_sjunpin(item);
	    }

	    item = &(SJCache[entry->sjhe_groupno]);

	    /* if io in progress, wait for it to complete and try again */
	    if (item->sjc_gflags & SJC_IOINPROG) {
		_sjunpin(item);
		_sjwait_io(item);
		return (_sjfetchgrp(dbid, relid, blkno, grpno));
	    }

	    SpinRelease(SJCacheLock);
	} else {

	    /* okay, we need to read the extent from a platter */
	    bcopy((char *) &tag, (char *) &(item->sjc_tag), sizeof(tag));
	    entry = _sjhashop(&tag, HASH_ENTER, &found);
	    entry->sjhe_groupno = *grpno;

	    SET_IO_LOCK(item);

	    /* read the extent off the optical platter */
	    _sjrdextent(item);

	    /* update the magnetic disk cache */
	    _sjwritegrp(item, *grpno);

	    /* done, release IO lock */
	    _sjunwait_io(item);
	}
    }

    return (item);
}

/*
 *  _sjrdextent() -- Read an extent from an optical platter.
 *
 *	This routine prepares the SJCacheItem group for the pgjb_rdextent()
 *	routine to work with, and passes it along.  We don't have exclusive
 *	access to the cache metadata on entry, but we do have the IOINPROGRESS
 *	bit set on the item we're working with, so on one else will screw
 *	around with it.
 */

static void
_sjrdextent(item)
    SJCacheItem *item;
{
    Relation reln;
    HeapScanDesc hscan;
    HeapTuple htup;
    TupleDescriptor tupdesc;
    Datum d;
    Boolean n;
    Name plname;
    ScanKeyEntryData skey[3];

    /* first get platter id and offset from pg_plmap */
    reln = heap_openr(Name_pg_plmap);
    tupdesc = RelationGetTupleDescriptor(reln);
    ScanKeyEntryInitialize(&skey[0], 0x0, Anum_pg_plmap_pldbid,
			   ObjectIdEqualRegProcedure,
			   ObjectIdGetDatum(item->sjc_tag.sjct_dbid));
    ScanKeyEntryInitialize(&skey[1], 0x0, Anum_pg_plmap_plrelid,
			   ObjectIdEqualRegProcedure,
			   ObjectIdGetDatum(item->sjc_tag.sjct_relid));
    ScanKeyEntryInitialize(&skey[2], 0x0, Anum_pg_plmap_plblkno,
			   Integer32EqualRegProcedure,
			   Int32GetDatum(item->sjc_tag.sjct_base));
    hscan = heap_beginscan(reln, false, NowTimeQual, 3, &skey[0]);

    /*
     *  if there is no matching entry in the platter map, then we're
     *  asking for an extent that has not yet been allocated.  in this
     *  case, we return a zero-filled extent.  this happens, for example,
     *  when we try to read the initial block of a relation before one
     *  has been written.
     */

    if (!HeapTupleIsValid(htup = heap_getnext(hscan, false, (Buffer *) NULL))) {
	heap_endscan(hscan);
	heap_close(reln);
	bzero(&(SJCacheBuf[0]), SJBUFSIZE);
	return;
    }

    d = (Datum) heap_getattr(htup, InvalidBuffer, Anum_pg_plmap_plid,
			     tupdesc, &n);
    item->sjc_plid = DatumGetObjectId(d);
    d = (Datum) heap_getattr(htup, InvalidBuffer, Anum_pg_plmap_ploffset,
			     tupdesc, &n);
    item->sjc_jboffset = DatumGetInt32(d);

    heap_endscan(hscan);
    heap_close(reln);

    /* now figure out the platter's name from pg_platter */
    reln = heap_openr(Name_pg_platter);
    tupdesc = RelationGetTupleDescriptor(reln);
    ScanKeyEntryInitialize(&skey[0], 0x0, ObjectIdAttributeNumber,
			   ObjectIdEqualRegProcedure,
			   ObjectIdGetDatum(item->sjc_plid));
    hscan = heap_beginscan(reln, false, NowTimeQual, 1, &skey[0]);

    if (!HeapTupleIsValid(htup = heap_getnext(hscan, false, (Buffer *) NULL))) {
	_sjunwait_io(item);
	elog(WARN, "_sjrdextent: cannot find platter oid %d", item->sjc_plid);
    }

    d = (Datum) heap_getattr(htup, InvalidBuffer, Anum_pg_platter_plname,
			     tupdesc, &n);
    plname = DatumGetName(d);
    bcopy(&(plname->data[0]), &(item->sjc_plname.data[0]), sizeof(NameData));

    heap_endscan(hscan);
    heap_close(reln);

    /*
     *  Okay, by here, we have all the fields in item filled in except for
     *  sjc_oid, sjc_gflags, and sjc_flags[].  Those are all filled in by
     *  pgjb_rdextent(), so we call that routine to do the work.
     */

    if (pgjb_rdextent(item, &SJCacheBuf[0]) == SM_FAIL) {
	_sjunwait_io(item);
	elog(WARN, "read of extent <%d,%d,%d> from platter %d failed",
		   item->sjc_tag.sjct_dbid, item->sjc_tag.sjct_relid,
		   item->sjc_tag.sjct_base, item->sjc_plid);
    }
}

/*
 *  _sjtouch() -- Increment reference count on the supplied item.
 *
 *	If this is the first reference to the item, we remove it from the
 *	free list.  On entry and exit, we hold SJCacheLock.  If we pulled
 *	the item off the free list, we adjust SJHeader->sjh_nentries.
 */

static void
_sjtouch(item)
    SJCacheItem *item;
{
    /*
     *  Bump the reference count to this group.  If it's the first
     *  reference, pull the group off the free list.
     */

    if (++(item->sjc_refcount) == 1) {

	/* if at the start of the free list, adjust 'head' pointer */
	if (item->sjc_freeprev != -1)
	    SJCache[item->sjc_freeprev].sjc_freenext = item->sjc_freenext;
	else
	    SJHeader->sjh_freehead = item->sjc_freenext;

	/* if at the end of the free list, adjust 'tail' pointer */
	if (item->sjc_freenext != -1)
	    SJCache[item->sjc_freenext].sjc_freeprev = item->sjc_freeprev;
	else
	    SJHeader->sjh_freetail = item->sjc_freeprev;

	/* disconnect from free list */
	item->sjc_freeprev = item->sjc_freenext = -1;

	/* keep track of number of groups allocated */
	(SJHeader->sjh_nentries)++;
    }
}

/*
 *  _sjunpin() -- Decrement reference count on the supplied item.
 *
 *	If we are releasing the last reference to the supplied item, we put
 *	it back on the free list.  On entry and exit, we do not hold the
 *	cache lock.  We must acquire it in order to carry out the requested
 *	release.
 */

static void
_sjunpin(item)
    SJCacheItem *item;
{
    int grpno;

    /* exclusive access */
    SpinAcquire(SJCacheLock);

    /* item had better be pinned */
    if (item->sjc_refcount <= 0)
	elog(FATAL, "_sjunpin: illegal reference count");

    /*
     *  Unpin the item.  If this is the last reference, put the item at the
     *  end of the free list.  Implemenation note:  if SJHeader->sjh_freehead
     *  is -1, then the list is empty, and SJHeader->sjh_freetail is also -1.
     */

    if (--(item->sjc_refcount) == 0) {

	grpno = GROUPNO(item);

	if (SJHeader->sjh_freehead == -1) {
	    SJHeader->sjh_freehead = grpno;
	} else {
	    item->sjc_freeprev = SJHeader->sjh_freetail;
	    SJCache[SJHeader->sjh_freetail].sjc_freenext = grpno;
	}

	/* put item at end of free list */
	SJHeader->sjh_freetail = grpno;
	(SJHeader->sjh_nentries)--;
    }

    SpinRelease(SJCacheLock);
}

static int
_sjwritegrp(item, grpno)
    SJCacheItem *item;
    int grpno;
{
    long seekpos;
    long loc;
    int nbytes, i;
    char *buf;

    /* first update the metadata file */
    seekpos = grpno * sizeof(*item);

    if ((loc = FileSeek(SJMetaVfd, seekpos, SEEK_SET)) != seekpos)
	return (SM_FAIL);

    nbytes = sizeof(*item);
    buf = (char *) item;
    while (nbytes > 0) {
	i = FileWrite(SJMetaVfd, buf, nbytes);
	if (i < 0)
	    return (SM_FAIL);
	nbytes -= i;
	buf += i;
    }

    FileSync(SJMetaVfd);

    /* now update the cache file */
    seekpos = grpno * SJBUFSIZE;
    if ((loc = FileSeek(SJCacheVfd, seekpos, SEEK_SET)) != seekpos)
	return (SM_FAIL);

    nbytes = SJBUFSIZE;
    buf = &(SJCacheBuf[0]);
    while (nbytes > 0) {
	i = FileWrite(SJCacheVfd, buf, nbytes);
	if (i < 0)
	    return (SM_FAIL);
	nbytes -= i;
	buf += i;
    }

    FileSync(SJCacheVfd);

    return (SM_SUCCESS);
}

/*
 *  sjextend() -- extend a relation by one block.
 */

int
sjextend(reln, buffer)
    Relation reln;
    char *buffer;
{
    SJCacheItem *item;
    SJHashEntry *entry;
    SJCacheTag tag;
    int grpno;
    int nblocks;
    int base;
    int offset;
    bool found;
    int grpoffset;
    long seekpos;

    RelationSetLockForExtend(reln);
    nblocks = sjnblocks(reln);
    base = (nblocks / SJGRPSIZE) * SJGRPSIZE;

    SpinAcquire(SJCacheLock);

    /*
     *  If the highest extent is full, we need to allocate a new group in
     *  the cache.  As a side effect, _sjnewextent will release SJCacheLock.
     *  We need to reacquire it immediately afterwards.
     */

    if ((nblocks % SJGRPSIZE) == 0) {
	_sjnewextent(reln, base);
	SpinAcquire(SJCacheLock);
    }

    if (reln->rd_rel->relisshared)
	tag.sjct_dbid = (ObjectId) 0;
    else
	tag.sjct_dbid = MyDatabaseId;

    tag.sjct_relid = reln->rd_id;
    tag.sjct_base = base;

    entry = _sjhashop(&tag, HASH_FIND, &found);

    if (!found) {
	SpinRelease(SJCacheLock);
	elog(WARN, "sjextend:  hey mao:  your group is missing.");
    }

    /* find the item and block in the item to write */
    grpno = entry->sjhe_groupno;
    item = &SJCache[grpno];
    grpoffset = nblocks % SJGRPSIZE;

    /*
     *  Okay, allocate the next block in this extent by marking it 'not
     *  missing'.  Once we've done this, we must hold the extend lock
     *  until end of transaction, since the number of allocated blocks no
     *  longer matches the block count visible to other backends.
     */

    if (!(item->sjc_flags[grpoffset] & SJC_MISSING)) {
	SpinRelease(SJCacheLock);
	elog(WARN, "sjextend: cache botch: next block in group present");
    } else {
	item->sjc_flags[grpoffset] &= ~SJC_MISSING;
    }

    _sjtouch(item);

    SET_IO_LOCK(item);

    /* page is allocated */
    item->sjc_flags[grpoffset] = SJC_CLEAR;

    /* verify group descriptor data in the cache file */
    if (_sjgroupvrfy(item, grpno) == SM_FAIL) {
	_sjunpin(item);
	_sjunwait_io(item);
	return (SM_FAIL);
    }

    /* write the page */
    seekpos = (grpno * SJBUFSIZE) + ((nblocks % SJGRPSIZE) * BLCKSZ)
	      + JBBLOCKSZ;
    if (FileSeek(SJCacheVfd, seekpos, SEEK_SET) != seekpos) {
	elog(NOTICE, "sjextend: failed to seek to buffer lock (%d)", seekpos);
	_sjunpin(item);
	_sjunwait_io(item);
	return (SM_FAIL);
    }

    if (FileWrite(SJCacheVfd, buffer, BLCKSZ) != BLCKSZ) {
	elog(NOTICE, "sjextend: can't write page %d", nblocks);
	_sjunwait_io(item);
	_sjunpin(item);
	return (SM_FAIL);
    }

    /* write the updated cache metadata entry */
    seekpos = grpno * sizeof(*item);

    if (FileSeek(SJMetaVfd, seekpos, SEEK_SET) != seekpos) {
	elog(NOTICE, "sjextend: seek to %d on metadata file failed", seekpos);
	_sjunwait_io(item);
	_sjunpin(item);
	return (SM_FAIL);
    }

    if (FileWrite(SJMetaVfd, (char *) item, sizeof(*item)) < 0) {
	elog(NOTICE, "sjextend: write of metadata file failed");
	_sjunwait_io(item);
	_sjunpin(item);
	return (SM_FAIL);
    }

    /* success */
    _sjunwait_io(item);
    _sjunpin(item);

    tag.sjct_base = ++nblocks;
    _sjregnblocks(&tag);

    return (SM_SUCCESS);
}

static int
_sjreadgrp(item, grpno)
    SJCacheItem *item;
    int grpno;
{
    long seekpos;
    long loc;
    int nbytes, i;
    char *buf;
    SJGroupDesc *gdesc;

    /* get the group from the cache file */
    seekpos = grpno * SJBUFSIZE;
    if ((loc = FileSeek(SJCacheVfd, seekpos, SEEK_SET)) != seekpos) {
	elog(NOTICE, "_sjreadgrp: cannot seek");
	return (SM_FAIL);
    }

    nbytes = SJBUFSIZE;
    buf = &(SJCacheBuf[0]);
    while (nbytes > 0) {
	i = FileRead(SJCacheVfd, buf, nbytes);
	if (i < 0) {
	    elog(NOTICE, "_sjreadgrp: read failed");
	    return (SM_FAIL);
	}
	nbytes -= i;
	buf += i;
    }

    gdesc = (SJGroupDesc *) &(SJCacheBuf[0]);

    if (gdesc->sjgd_magic != SJGDMAGIC
	|| gdesc->sjgd_version != SJGDVERSION
	|| gdesc->sjgd_groupoid != item->sjc_oid) {

	elog(NOTICE, "_sjreadgrp: trashed cache");
	return (SM_FAIL);
    }

    return (SM_SUCCESS);
}

int
sjunlink(reln)
    Relation reln;
{
    return (SM_FAIL);
}

/*
 *  _sjnewextent() -- Add a new extent to a relation in the jukebox cache.
 */

static void
_sjnewextent(reln, base)
    Relation reln;
    BlockNumber base;
{
    SJHashEntry *entry;
    SJGroupDesc *group;
    SJCacheItem *item;
    bool found;
    int grpno;
    int i;

    item = _sjallocgrp(&grpno);

    if (reln->rd_rel->relisshared)
	item->sjc_tag.sjct_dbid = (ObjectId) 0;
    else
	item->sjc_tag.sjct_dbid = MyDatabaseId;

    item->sjc_tag.sjct_relid = (ObjectId) reln->rd_id;
    item->sjc_tag.sjct_base = base;

    entry = _sjhashop(&(item->sjc_tag), HASH_ENTER, &found);

    entry->sjhe_groupno = grpno;

    SET_IO_LOCK(item);

    /* set flags on item, initialize group descriptor block */
    item->sjc_gflags = SJC_CLEAR;
    for (i = 0; i < SJGRPSIZE; i++)
	item->sjc_flags[i] = SJC_MISSING;

    /* should be smarter and only bzero what we need to */
    bzero(SJCacheBuf, SJBUFSIZE);

    group = (SJGroupDesc *) (&SJCacheBuf[0]);
    group->sjgd_magic = SJGDMAGIC;
    group->sjgd_version = SJGDVERSION;

    if (reln->rd_rel->relisshared) {
	group->sjgd_dbid = (ObjectId) 0;
    } else {
	strncpy(&(group->sjgd_dbname.data[0]),
		&(MyDatabaseName->data[0]),
		sizeof(NameData));
	group->sjgd_dbid = (ObjectId) MyDatabaseId;
    }

    strncpy(&(group->sjgd_relname.data[0]),
	    &(reln->rd_rel->relname.data[0]),
	    sizeof(NameData));
    group->sjgd_relid = reln->rd_id;
    group->sjgd_relblkno = base;
    item->sjc_oid = group->sjgd_groupoid = newoid();

    /*
     *  Record the presence of the new extent in the system catalogs.  The
     *  plid, jboffset, and extentsz fields are filled in by _sjregister()
     *  or the routines that it calls.  Note that we do not force the new
     *  group descriptor block all the way to the optical platter here.
     *  We do decide where to place it, however, and must go to a fair amount
     *  of trouble elsewhere in the code to avoid allocating the same extent
     *  to a different relation, or block within the same relation.
     */

    _sjregister(item, group);

    /*
     *  Write the new group cache entry to disk.  Sjwritegrp() knows where
     *  the cache buffer begins, and forces out the group descriptor we
     *  just set up.
     */

    if (_sjwritegrp(item, grpno) == SM_FAIL) {
	_sjunwait_io(item);
	elog(FATAL, "_sjnewextent: cannot write new extent to disk");
    }

    _sjregnblocks(&(item->sjc_tag));

    /* can now release i/o lock on the item we just added */
    _sjunwait_io(item);

    /* no longer need the reference */
    _sjunpin(item);
}

/*
 *  _sjhashop() -- Do lookup, insertion, or deletion on the metadata hash
 *		   table in shared memory.
 *
 *	We don't worry about the number of entries in the hash table here;
 *	that's handled at a higher level (_sjallocgrp and _sjgetgrp).  We
 *	hold SJCacheLock on entry.
 */

static SJHashEntry *
_sjhashop(tagP, op, foundP)
    SJCacheTag *tagP;
    HASHACTION op;
    bool *foundP;
{
    SJHashEntry *entry;

    entry = (SJHashEntry *) hash_search(SJCacheHT, (char *) tagP, op, foundP);

    if (entry == (SJHashEntry *) NULL) {
	SpinRelease(SJCacheLock);
	elog(FATAL, "_sjhashop: hash table corrupt.");
    }

    if (*foundP) {
	if (op == HASH_ENTER) {
	    SpinRelease(SJCacheLock);
	    elog(WARN, "_sjhashop: cannot enter <%d,%d,%d>: already exists",
		 tagP->sjct_dbid, tagP->sjct_relid, tagP->sjct_base);
	}
    } else {
	if (op == HASH_REMOVE) {
	    SpinRelease(SJCacheLock);
	    elog(WARN, "_sjhashop: cannot delete <%d,%d,%d>: missing",
		 tagP->sjct_dbid, tagP->sjct_relid, tagP->sjct_base);
	}
    }

    return (entry);
}

int
sjopen(reln)
    Relation reln;
{
    char *path;
    int fd;
    extern char *relpath();

    path = relpath(&(reln->rd_rel->relname.data[0]));

    fd = FileNameOpenFile(path, O_RDWR, 0600);

    return (fd);
}

int
sjclose(reln)
    Relation reln;
{
    FileClose(reln->rd_fd);

    return (SM_SUCCESS);
}

int
sjread(reln, blocknum, buffer)
    Relation reln;
    BlockNumber blocknum;
    char *buffer;
{
    SJCacheItem *item;
    ObjectId reldbid;
    BlockNumber base;
    int offset;
    int grpno;
    long seekpos;

    /* fake successful read on non-existent data */
    if (sjnblocks(reln) <= blocknum) {
	bzero(buffer, BLCKSZ);
	return (SM_SUCCESS);
    }

    if (reln->rd_rel->relisshared)
	reldbid = (ObjectId) 0;
    else
	reldbid = MyDatabaseId;

    base = (blocknum / SJGRPSIZE) * SJGRPSIZE;

    item = _sjfetchgrp(reldbid, reln->rd_id, base, &grpno);

    /* shd expand _sjfetchgrp() inline to avoid extra semop()s */
    SpinAcquire(SJCacheLock);

    SET_IO_LOCK(item);

    /* First read and verify the group descriptor metadata */
    if (_sjgroupvrfy(item, grpno) == SM_FAIL) {
	_sjunpin(item);
	_sjunwait_io(item);
	return (SM_FAIL);
    }

    /* By here, group descriptor metadata is okay */
    seekpos = (grpno * SJBUFSIZE) + ((blocknum % SJGRPSIZE) * BLCKSZ)
	      + JBBLOCKSZ;
    if (FileSeek(SJCacheVfd, seekpos, SEEK_SET) != seekpos) {
	elog(NOTICE, "sjread: failed to seek to buffer lock (%d)", seekpos);
	_sjunpin(item);
	_sjunwait_io(item);
	return (SM_FAIL);
    }

    /* read the requested page */
    if (FileRead(SJCacheVfd, buffer, BLCKSZ) != BLCKSZ) {
	elog(NOTICE, "sjread: can't read page %d", blocknum);
	_sjunwait_io(item);
	return (SM_FAIL);
    }

    _sjunwait_io(item);
    _sjunpin(item);

    return (SM_SUCCESS);
}

static int
_sjgroupvrfy(item, grpno)
    SJCacheItem *item;
    int grpno;
{
    long seekpos;
    SJGroupDesc gdesc;

    seekpos = SJBUFSIZE * grpno;
    if (FileSeek(SJCacheVfd, seekpos, SEEK_SET) != seekpos) {
	elog(NOTICE, "sjgroupvrfy: Cannot seek to %d on sj cache file",
		     seekpos);
	return (SM_FAIL);
    }

    if (FileRead(SJCacheVfd, (char *) &gdesc, sizeof(gdesc)) < 0) {
	elog(NOTICE, "sjgroupvrfy: Cannot read group desc from sj cache file");
	return (SM_FAIL);
    }

    if (gdesc.sjgd_magic != SJGDMAGIC
	|| gdesc.sjgd_version != SJGDVERSION
	|| gdesc.sjgd_groupoid != item->sjc_oid) {

	elog(NOTICE, "sjgroupvrfy: trashed cache");
	return (SM_FAIL);
    }

    return (SM_SUCCESS);
}

int
sjwrite(reln, blocknum, buffer)
    Relation reln;
    BlockNumber blocknum;
    char *buffer;
{
    ObjectId reldbid;

    if (reln->rd_rel->relisshared)
	reldbid = (ObjectId) 0;
    else
	reldbid = MyDatabaseId;

    return (_sjdowrite(reldbid, reln->rd_id, blocknum, buffer));
}

static int
_sjdowrite(reldbid, relid, blocknum, buffer)
    ObjectId reldbid;
    ObjectId relid;
    BlockNumber blocknum;
    char *buffer;
{
    SJCacheItem *item;
    BlockNumber base;
    int offset;
    int grpno;
    int which;
    long seekpos;

    base = (blocknum / SJGRPSIZE) * SJGRPSIZE;

    item = _sjfetchgrp(reldbid, relid, base, &grpno);

    /* shd expand _sjfetchgrp() inline to avoid extra semop()s */
    SpinAcquire(SJCacheLock);

    which = blocknum % SJGRPSIZE;

    if (item->sjc_flags[which] & SJC_ONPLATTER) {
	SpinRelease(SJCacheLock);
	_sjunpin(item);
	elog(WARN, "sjwrite: optical platters are write-once, cannot rewrite");
    }

    SET_IO_LOCK(item);

    item->sjc_flags[which] &= ~SJC_MISSING;

    /* verify group descriptor data in the cache file */
    if (_sjgroupvrfy(item, grpno) == SM_FAIL) {
	_sjunpin(item);
	_sjunwait_io(item);
	return (SM_FAIL);
    }

    /* write the page */
    seekpos = (grpno * SJBUFSIZE) + ((blocknum % SJGRPSIZE) * BLCKSZ)
	      + JBBLOCKSZ;
    if (FileSeek(SJCacheVfd, seekpos, SEEK_SET) != seekpos) {
	elog(NOTICE, "sjwrite: failed to seek to buffer lock (%d)", seekpos);
	_sjunpin(item);
	_sjunwait_io(item);
	return (SM_FAIL);
    }

    if (FileWrite(SJCacheVfd, buffer, BLCKSZ) != BLCKSZ) {
	elog(NOTICE, "sjwrite: can't read page %d", blocknum);
	_sjunwait_io(item);
	_sjunpin(item);
	return (SM_FAIL);
    }

    /* write the updated cache metadata entry */
    seekpos = grpno * sizeof(*item);

    if (FileSeek(SJMetaVfd, seekpos, SEEK_SET) != seekpos) {
	elog(NOTICE, "sjwrite: seek to %d on metadata file failed", seekpos);
	_sjunwait_io(item);
	_sjunpin(item);
	return (SM_FAIL);
    }

    if (FileWrite(SJMetaVfd, (char *) item, sizeof(*item)) < 0) {
	elog(NOTICE, "sjwrite: write of metadata file failed");
	_sjunwait_io(item);
	_sjunpin(item);
	return (SM_FAIL);
    }

    _sjunwait_io(item);
    _sjunpin(item);

    return (SM_SUCCESS);
}

int
sjflush(reln, blocknum, buffer)
    Relation reln;
    BlockNumber blocknum;
    char *buffer;
{
    return (sjwrite(reln, blocknum, buffer));
}

int
sjblindwrt(dbstr, relstr, dbid, relid, blkno, buffer)
    char *dbstr;
    char *relstr;
    OID dbid;
    OID relid;
    BlockNumber blkno;
    char *buffer;
{
    return (_sjdowrite(dbid, relid, blkno, buffer));
}

/*
 *  sjnblocks() -- Return the number of blocks that appear in this relation.
 *
 *	Rather than compute this by walking through pg_plmap and fetching
 *	groups off of platters, we store the number of blocks currently
 *	allocated to a relation in a special Unix file.
 */

int
sjnblocks(reln)
    Relation reln;
{
    SJCacheTag tag;
    int nblocks;

    if (reln->rd_rel->relisshared)
	tag.sjct_dbid = (ObjectId) 0;
    else
	tag.sjct_dbid = MyDatabaseId;

    tag.sjct_relid = reln->rd_id;

    tag.sjct_base = (BlockNumber) _sjfindnblocks(&tag);

    return ((int) (tag.sjct_base));
}

/*
 *  _sjfindnblocks() -- Find block count for the (dbid,relid) pair.
 */

static int
_sjfindnblocks(tag)
    SJCacheTag *tag;
{
    int nbytes;
    int i;
    SJCacheTag *cachetag;
    SJCacheTag mytag;

    cachetag = SJNBlockCache;
    i = 0;
    while (i < SJNBLKSIZE && cachetag->sjct_relid != (ObjectId) 0) {
	if (cachetag->sjct_dbid == tag->sjct_dbid
	    && cachetag->sjct_relid == tag->sjct_relid) {
	    return (cachetag->sjct_base);
	}
	i++;
	cachetag++;
    }

    if (FileSeek(SJBlockVfd, 0L, SEEK_SET) != 0) {
	elog(FATAL, "_sjfindnblocks: cannot seek to zero on block count file");
    }

    while ((nbytes = FileRead(SJBlockVfd, (char *)&mytag, sizeof(mytag))) > 0) {
	if (mytag.sjct_dbid == tag->sjct_dbid
	    && mytag.sjct_relid == tag->sjct_relid) {

	    if (i == SJNBLKSIZE) {
		/* fast pseudo-random function */
		i = mytag.sjct_relid % SJNBLKSIZE;
		cachetag = &(SJNBlockCache[i]);
	    }

	    /* save cache tag */
	    cachetag->sjct_dbid = mytag.sjct_dbid;
	    cachetag->sjct_relid = mytag.sjct_relid;
	    cachetag->sjct_base = mytag.sjct_base;

	    return (mytag.sjct_base);
	}
    }

    elog(FATAL, "_sjfindnblocks: cannot get block count for <%d,%d>",
		tag->sjct_dbid, tag->sjct_relid);
}

/*
 *  _sjregnblocks() -- Remember the count of blocks for this relid.
 */

static void
_sjregnblocks(tag)
    SJCacheTag *tag;
{
    int loc;
    int i;
    SJCacheTag *cachetag;
    SJCacheTag mytag;

    cachetag = SJNBlockCache;
    i = 0;
    while (i < SJNBLKSIZE && cachetag->sjct_relid != (ObjectId) 0) {
	if (cachetag->sjct_dbid == tag->sjct_dbid
	    && cachetag->sjct_relid == tag->sjct_relid)
	    break;

	i++;
	cachetag++;
    }

    if (i == SJNBLKSIZE) {
	i = tag->sjct_relid % SJNBLKSIZE;
	cachetag = &(SJNBlockCache[i]);
    }

    cachetag->sjct_dbid = tag->sjct_dbid;
    cachetag->sjct_relid = tag->sjct_relid;
    cachetag->sjct_base = tag->sjct_base;

    /* update block count file */
    if (FileSeek(SJBlockVfd, 0L, SEEK_SET) < 0) {
	elog(FATAL, "_sjregnblocks: cannot seek to zero on block count file");
    }

    loc = 0;
    mytag.sjct_base = tag->sjct_base;

    /* overwrite existing entry, if any */
    while (FileRead(SJBlockVfd, (char *) &mytag, sizeof(mytag)) > 0) {
	if (mytag.sjct_dbid == tag->sjct_dbid
	    && mytag.sjct_relid == tag->sjct_relid) {
	    if (FileSeek(SJBlockVfd, (loc * sizeof(SJCacheTag)),
			 SEEK_SET) < 0)
		elog(FATAL, "_sjregnblocks: cannot seek to loc");
	    if (FileWrite(SJBlockVfd, (char *) tag, sizeof(*tag)) < 0)
		elog(FATAL, "_sjregnblocks: cannot write nblocks");
	    return;
	}
	loc++;
    }

    /* new relation -- write at end of file */
    if (FileWrite(SJBlockVfd, (char *) tag, sizeof(*tag)) < 0)
	elog(FATAL, "_sjregnblocks: cannot write nblocks for new reln");
}
int
sjcommit()
{
    FileSync(SJMetaVfd);
    FileSync(SJCacheVfd);
    FileSync(SJBlockVfd);

    return (SM_SUCCESS);
}

int
sjabort()
{
    return (SM_SUCCESS);
}

/*
 *  SJShmemSize() -- Declare amount of shared memory we require.
 *
 *	The shared memory initialization code creates a block of shared
 *	memory exactly big enough to hold all the structures it needs to.
 *	This routine declares how much space the Sony jukebox cache will
 *	use.
 */

int
SJShmemSize()
{
    int size;
    int nbuckets;
    int nsegs;
    int tmp;

    /* size of cache metadata */
    size =
	MAXALIGN((SJCACHESIZE + 1) * sizeof(SJCacheItem)) +
	    MAXALIGN(sizeof(SJCacheHeader));

    /* nblock cache */
    size += MAXALIGN(SJNBLKSIZE * sizeof(SJCacheTag));

#ifndef HAS_TEST_AND_SET
    size += MAXALIGN(sizeof(*SJNWaiting));
#endif /* ndef HAS_TEST_AND_SET */

    /* size of hash table */
    nbuckets = 1 << (int)my_log2((SJCACHESIZE - 1) / DEF_FFACTOR + 1);
    nsegs = 1 << (int)my_log2((nbuckets - 1) / DEF_SEGSIZE + 1);
    size += MAXALIGN(my_log2(SJCACHESIZE) * sizeof(void *));
    size += MAXALIGN(sizeof(HHDR));
    size += nsegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
    tmp = (int)ceil((double)SJCACHESIZE/BUCKET_ALLOC_INCR);
    size += tmp * BUCKET_ALLOC_INCR *
	(MAXALIGN(sizeof(BUCKET_INDEX)) +
	 MAXALIGN(sizeof(SJHashEntry)));

    /* count shared memory required for jukebox state */
    size += JBShmemSize();

    return (size);
}

/*
 *  sjmaxseg() -- Find highest segment number occupied by platter id plid
 *		  in the on-disk cache.
 *
 *	This routine is called from _pgjb_findoffset().  On entry here,
 *	we hold JBSpinLock, but not SJCacheLock.  We do something a little
 *	dangerous here; we trust the group descriptor metadata that is in
 *	shared memory to reflect accurately the state of the actual cache
 *	file.  This isn't so bad; if there's an inconsistency, there are
 *	exactly two possibilities:
 *
 *		+  There was a crash between metadata and cache update,
 *		   and we'll figure that out later;
 *
 *		+  Some other backend has IO_IN_PROG set on the group we
 *		   are examining, and we need to look at the group desc
 *		   on disk in order to find out if the group is on plid.
 *
 *	The second case basically means that we wind up holding SJCacheLock
 *	during a disk io, but that's a sufficiently rare event that we don't
 *	care.  I can't think of any cleaner way to do this, anyway.
 *
 *	We return the address of the first block of the highest-numbered
 *	extent that we have cached for plid.  If we have none cached, we
 *	return InvalidBlockNumber.
 */

BlockNumber
sjmaxseg(plid)
    ObjectId plid;
{
    int i;
    long seekpos, loc;
    int nbytes;
    BlockNumber last;
    SJGroupDesc *group;

    /* XXX hold the lock for a walk of the entire cache */
    SpinAcquire(SJCacheLock);

    last = InvalidBlockNumber;
    group = (SJGroupDesc *) &(SJCacheBuf[0]);

    /*
     *  Walk backwards along the free list.  If we ever hit an unallocated
     *  block, we can stop searching.  Otherwise, we'll hit the head of the
     *  list when freeprev == -1.
     */

    for (i = SJHeader->sjh_freetail;
	 i != -1 && SJCache[i].sjc_oid != InvalidObjectId;
	 i = SJCache[i].sjc_freeprev) {

	/* if IO_IN_PROG is set, we need to look at the group desc on disk */
	if (SJCache[i].sjc_gflags & SJC_IOINPROG) {
	    seekpos = i * SJBUFSIZE;
	    if ((loc = FileSeek(SJCacheVfd, seekpos, SEEK_SET)) != seekpos) {
		SpinRelease(SJCacheLock);
		elog(NOTICE, "sjmaxseg: cannot seek");
		return (-1);
	    }

	    nbytes = FileRead(SJCacheVfd, (char *) group, sizeof(SJGroupDesc));
	    if (nbytes != sizeof(SJGroupDesc)) {
		SpinRelease(SJCacheLock);
		elog(NOTICE, "sjmaxseg: read of group desc %d failed", i);
		return (-1);
	    }

	    /* sanity checks */
	    if (group->sjgd_magic != SJGDMAGIC
		|| group->sjgd_version != SJGDVERSION) {
		elog(FATAL, "sjmaxseg: cache file corrupt.");
	    }

	    if (group->sjgd_plid == plid) {
		if (group->sjgd_jboffset > last || last == InvalidBlockNumber)
		    last = group->sjgd_jboffset;
	    }
	} else {
	    if (SJCache[i].sjc_plid == plid) {
		if (SJCache[i].sjc_jboffset > last
		    || last == InvalidBlockNumber) {

		    last = SJCache[i].sjc_jboffset;
		}
	    }
	}
    }

    SpinRelease(SJCacheLock);

    return (last);
}

static void
_sjdump()
{
    int i, j;
    int nentries;
    SJCacheItem *item;

    SpinAcquire(SJCacheLock);

    nentries = SJHeader->sjh_nentries;

    printf("jukebox cache metdata: size %d, %d entries, free head %d tail %d",
	   SJCACHESIZE, nentries, SJHeader->sjh_freehead,
	   SJHeader->sjh_freetail);
    if (SJHeader->sjh_flags & SJH_INITING)
	printf(", INITING");
    if (SJHeader->sjh_flags & SJH_INITED)
	printf(", INITED");
    printf("\n");

    for (i = 0; i < SJCACHESIZE; i++) {
	item = &SJCache[i];
	printf("    [%2d] <%ld,%ld,%ld> %d@@%d next %d prev %d flags %s oid %ld\n",
	       i, item->sjc_tag.sjct_dbid, item->sjc_tag.sjct_relid,
	       item->sjc_tag.sjct_base, item->sjc_plid, item->sjc_jboffset,
	       item->sjc_freenext, item->sjc_freeprev,
	       (item->sjc_gflags & SJC_IOINPROG ? "IO_IN_PROG" : "CLEAR"),
	       item->sjc_oid);
	printf("         ");
	for (j = 0; j < SJGRPSIZE; j++) {
	    printf("[%d %c%c]", j,
	    	   (item->sjc_flags[j] & SJC_MISSING ? 'm' : '-'),
	    	   (item->sjc_flags[j] & SJC_ONPLATTER ? 'o' : '-'));
	}
	printf("\n");
    }

    SpinRelease(SJCacheLock);
}

/*
 *  SJInitSemaphore() -- Initialize the 'wait' semaphore for jukebox cache
 *			 pages.
 *
 *	We only do this if we don't have test-and-set locks.
 */

SJInitSemaphore(key)
    IPCKey key;
{
#ifndef HAS_TEST_AND_SET
    int status;

    SJWaitSemId = IpcSemaphoreCreate(IPCKeyGetSJWaitSemaphoreKey(key),
				     1, IPCProtection, 0, &status);
    if (SJWaitSemId < 0) {
	elog(FATAL, "cannot create/attach jukebox semaphore");
    }
#else /* ndef HAS_TEST_AND_SET */
    return;
#endif /* ndef HAS_TEST_AND_SET */
}

#endif /* SONY_JUKEBOX */
@


1.46
log
@SEEK_SET
@
text
@d41 1
a41 1
RcsId("$Header: /faerie/aoki/postgres/src/backend/storage/smgr/RCS/sj.c,v 1.45 1994/02/07 11:44:51 aoki Exp aoki $");
d235 1
@


1.45
log
@ipc.h protos require ipci.h but someone included them backwards all over
the storage directory..
@
text
@d19 1
d41 1
a41 1
RcsId("$Header: /faerie/aoki/postgres/src/backend/storage/smgr/RCS/sj.c,v 1.44 1994/02/02 00:22:27 marc Exp aoki $");
d348 1
a348 1
    if (FileSeek(SJMetaVfd, 0L, L_SET) != 0)
d1246 1
a1246 1
    if ((loc = FileSeek(SJMetaVfd, seekpos, L_SET)) != seekpos)
d1263 1
a1263 1
    if ((loc = FileSeek(SJCacheVfd, seekpos, L_SET)) != seekpos)
d1369 1
a1369 1
    if (FileSeek(SJCacheVfd, seekpos, L_SET) != seekpos) {
d1386 1
a1386 1
    if (FileSeek(SJMetaVfd, seekpos, L_SET) != seekpos) {
d1423 1
a1423 1
    if ((loc = FileSeek(SJCacheVfd, seekpos, L_SET)) != seekpos) {
d1660 1
a1660 1
    if (FileSeek(SJCacheVfd, seekpos, L_SET) != seekpos) {
d1689 1
a1689 1
    if (FileSeek(SJCacheVfd, seekpos, L_SET) != seekpos) {
d1770 1
a1770 1
    if (FileSeek(SJCacheVfd, seekpos, L_SET) != seekpos) {
d1787 1
a1787 1
    if (FileSeek(SJMetaVfd, seekpos, L_SET) != seekpos) {
d1879 1
a1879 1
    if (FileSeek(SJBlockVfd, 0L, L_SET) != 0) {
d1940 1
a1940 1
    if (FileSeek(SJBlockVfd, 0L, L_SET) < 0) {
d1951 2
a1952 1
	    if (FileSeek(SJBlockVfd, (loc * sizeof(SJCacheTag)), L_SET) < 0)
d2083 1
a2083 1
	    if ((loc = FileSeek(SJCacheVfd, seekpos, L_SET)) != seekpos) {
@


1.44
log
@Fixes to make sure we don't expect NameData variables to be NULL terminated
and that we don't write mroe than NAMEDATALEN chars into them
@
text
@d19 1
a20 1
#include "storage/ipci.h"
d40 1
a40 1
RcsId("$Header: /usr/local/devel/postgres.test/src/backend/storage/smgr/RCS/sj.c,v 1.43 1994/02/01 11:55:34 aoki Exp marc $");
@


1.43
log
@fix shmem size allocation calculation to account for padding
@
text
@d40 1
a40 1
RcsId("$Header: /import/faerie/faerie/aoki/postgres/src/backend/storage/smgr/RCS/sj.c,v 1.42 1993/08/10 01:46:51 marc Exp aoki $");
d614 3
a616 2
	path = (char *) palloc(strlen(DataDir) + sizeof(NameData) + 2);
	sprintf(path, "%s/%.16s", DataDir, &(reln->rd_rel->relname.data[0]));
d618 2
a619 2
	path = (char *) palloc(sizeof(NameData) + 1);
	sprintf(path, "%.16s", &(reln->rd_rel->relname.data[0]));
@


1.42
log
@alpha port
@
text
@d28 1
d40 1
a40 1
RcsId("$Header: /usr/local/devel/postgres.alpha.merge/src/backend/storage/smgr/RCS/sj.c,v 1.41 1993/07/09 22:27:48 andrew Exp marc $");
d168 4
a171 2
    metasize = (SJCACHESIZE * sizeof(SJCacheItem)) + sizeof(SJCacheHeader)
		+ (SJNBLKSIZE * sizeof(SJCacheTag));
d173 5
a177 2
    metasize = (SJCACHESIZE * sizeof(SJCacheItem)) + sizeof(SJCacheHeader)
		+ (SJNBLKSIZE * sizeof(SJCacheTag)) + sizeof(*SJNWaiting);
d1996 7
a2002 1
    size = ((SJCACHESIZE + 1) * sizeof(SJCacheItem)) + sizeof(SJCacheHeader);
d2004 1
a2004 1
    size += sizeof(*SJNWaiting);
d2010 3
a2012 2
    size += my_log2(SJCACHESIZE) + sizeof(HHDR);
    size += nsegs * DEF_SEGSIZE * sizeof(SEGMENT);
d2015 2
a2016 4
            (sizeof(BUCKET_INDEX) + sizeof(SJHashEntry));

    /* nblock cache */
    size += SJNBLKSIZE * sizeof(SJCacheTag);
@


1.41
log
@fix missing argument to _sjfetchgrp and potential for infinite
loop. (if nentries==SJCACHESIZE on entry, the bizarre loop will
never get done.)
@
text
@d39 1
a39 1
RcsId("$Header: /private/src/postgres/src/backend/storage/smgr/RCS/sj.c,v 1.40 1993/06/01 16:19:36 mao Exp andrew $");
d130 1
a130 1
extern int		*ShmemInitStruct();
@


1.40
log
@turn on access to sony jukebox
@
text
@d39 1
a39 1
RcsId("$Header: /private/src/postgres/src/backend/storage/smgr/RCS/sj.c,v 1.39 1993/04/07 09:02:17 mao Exp mao $");
d392 1
a392 1
    start = nentries;
d1015 1
a1015 1
		return (_sjfetchgrp(dbid, relid, blkno));
@


1.39
log
@special hacks for s2k on heel -- new convention on sony page layout
requires data conversion i haven't done yet, so just turn off the
public sj routines until i get this finished.
@
text
@d39 1
a39 1
RcsId("$Header: /private/src/postgres/src/backend/storage/smgr/RCS/sj.c,v 1.38 1993/03/10 05:54:49 mao Exp mao $");
d120 1
a157 3
    /* XXX for s2k 4.1 initial install, no sj interface */
    return (SM_SUCCESS);

a541 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    return (SM_SUCCESS);

a570 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

a1292 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

a1448 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

a1592 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

a1603 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

a1621 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

d1709 17
a1726 1
    ObjectId reldbid;
a1732 8
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

    if (reln->rd_rel->relisshared)
	reldbid = (ObjectId) 0;
    else
	reldbid = MyDatabaseId;

d1735 1
a1735 1
    item = _sjfetchgrp(reldbid, reln->rd_id, base, &grpno);
a1804 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

d1817 1
a1817 4
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

    return (SM_FAIL);
a1834 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

a1859 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

a1958 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    return (SM_SUCCESS);

a1968 3
    /* XXX for s2k 4.1 initial install, no sj interfaces */
    elog(WARN, "sony jukebox conversion for S2K on heel not yet complete");

@


1.38
log
@fix up a couple of very old bugs in the cache management code
@
text
@d39 1
a39 1
RcsId("$Header: /usr/local/dev/postgres/newtree/src/backend/storage/smgr/RCS/sj.c,v 1.36 1992/09/05 23:41:58 mao Exp $");
d157 3
d544 3
d576 3
d1301 3
d1460 3
d1607 3
d1621 3
d1642 3
d1740 3
d1820 3
d1835 3
d1856 3
d1884 3
d1986 3
d1999 3
@


1.37
log
@new design -- don't store final block of all sony relations on magnetic
disk.
@
text
@d39 1
a39 1
RcsId("$Header: /private/src/postgres/src/backend/storage/smgr/RCS/sj.c,v 1.36 1992/09/05 23:41:58 mao Exp mao $");
d327 1
d340 3
d344 1
d385 22
a406 46
     */

    if (nentries == SJCACHESIZE || nentries == 0) {
	cur = &(SJCache[i]);
	cur->sjc_freeprev = i - 1;

	if (i == SJCACHESIZE - 1) {
	    cur->sjc_freenext = -1;
	} else {
	    cur->sjc_freenext = i + 1;
	}

	/* list head, tail pointers */
	SJHeader->sjh_freehead = 0;
	SJHeader->sjh_freetail = SJCACHESIZE - 1;
    } else {
	for (i = 0; i < nentries; i++) {
	    cur = &(SJCache[i]);

	    if (i == 0)
		cur->sjc_freeprev = SJCACHESIZE - 1;
	    else
		cur->sjc_freeprev = i - 1;

	    if (i == nentries - 1)
		cur->sjc_freenext = -1;
	    else
		cur->sjc_freenext = i + 1;
	}

	for (i = nentries; i < SJCACHESIZE; i++) {
	    cur = &(SJCache[i]);

	    /* mark this as unused by setting oid to invalid object id */
	    cur->sjc_oid = InvalidObjectId;

	    if (i == nentries)
		cur->sjc_freeprev = -1;
	    else
		cur->sjc_freeprev = i - 1;

	    if (i == SJCACHESIZE - 1)
		cur->sjc_freenext = 0;
	    else
		cur->sjc_freenext = i + 1;
	}
a407 5
	/* list head, tail pointers */
	SJHeader->sjh_freehead = nentries;
	SJHeader->sjh_freetail = nentries - 1;
    }

d409 2
d560 2
d563 6
d605 14
a618 2
    /* return a fake file descriptor */
    return (0);
d1448 1
a1448 7
    /*
     *  Even though we can't really unlink the relation, we return
     *  success here, so that users can destroy tables (remove the
     *  pg_class pointers to them).
     */

    return (SM_SUCCESS);
d1588 9
a1596 2
    /* return a fake fd */
    return (0);
d1603 2
@


1.36
log
@allow location of data/ to be supplied in environment or on command line
@
text
@d39 1
a39 1
RcsId("$Header: /private/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.35 1992/06/30 22:31:43 mao Exp mao $");
a581 2
    SJCacheItem *item;
    SJGroupDesc *group;
a582 6
    ObjectId dbid;
    ObjectId relid;
    File vfd;
    int grpno;
    int i;
    char *path;
d619 2
a620 14
    /* last thing to do is to create the mag-disk file to hold last page */
    if (reln->rd_rel->relisshared) {
	path = (char *) palloc(strlen(DataDir) + sizeof(NameData) + 2);
	sprintf(path, "%s/%.16s", DataDir, &(reln->rd_rel->relname.data[0]));
    } else {
	path = (char *) palloc(sizeof(NameData) + 1);
	sprintf(path, "%.16s", &(reln->rd_rel->relname.data[0]));
    }

    vfd = FileNameOpenFile(path, O_CREAT|O_RDWR|O_EXCL, 0600);

    pfree(path);

    return (vfd);
d1450 7
a1456 1
    return (SM_FAIL);
d1596 2
a1597 9
    char *path;
    int fd;
    extern char *relpath();

    path = relpath(&(reln->rd_rel->relname.data[0]));

    fd = FileNameOpenFile(path, O_RDWR, 0600);

    return (fd);
a1603 2
    FileClose(reln->rd_fd);

@


1.35
log
@initialize those variables before you go passing them around.
@
text
@d39 1
a39 1
RcsId("$Header: /private/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.34 1992/06/11 17:49:25 mao Exp mao $");
d100 3
d155 1
a155 2
    char *pghome;
    char path[SJPATHLEN];
d239 2
a240 2
    pghome = GetPGHome();
    sprintf(path, "%s/data/%s", pghome, SJCACHENAME);
d257 5
a261 1
    sprintf(path, "%s/data/%s", pghome, SJMETANAME);
d277 5
a281 1
    sprintf(path, "%s/data/%s", pghome, SJBLOCKNAME);
d297 2
d590 1
a590 1
    char path[SJPATHLEN];
d628 7
a634 4
    if (reln->rd_rel->relisshared)
	strcpy(path, "../");
    else
	path[0] = '\0';
d636 1
a636 1
    strncpy(path, &(reln->rd_rel->relname.data[0]), sizeof(NameData));
d638 1
a638 1
    vfd = FileNameOpenFile(path, O_CREAT|O_RDWR|O_EXCL, 0600);
@


1.34
log
@blocks allocated to platters according to a more sensible policy
@
text
@d39 1
a39 1
RcsId("$Header: /private/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.33 1992/05/28 17:09:35 mao Exp mao $");
d757 1
a757 1
	plmdata->plid = plattup->t_oid;
@


1.33
log
@checkin to sync up for testing -- changed a comment describing our
allocation strategy, but not the actual strategy.  need to get
back tot his.
@
text
@d39 1
a39 1
RcsId("$Header: /private/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.32 1992/01/29 21:32:50 mao Exp mao $");
d125 5
a129 3
extern HTAB	*ShmemInitHash();
extern int	*ShmemInitStruct();
extern Relation	RelationIdGetRelation();
d696 2
a697 1
 *	    platter <= 2/3 full.
d703 4
a706 1
 *	       allocate it there.
d708 2
a709 1
 *	    -  Otherwise, allocate the extent on any platter <= 2/3 full.
d727 2
d735 8
d745 5
a750 1

d754 22
a775 7
    /* get platter OID, name */
    plmdata->plid = plattup->t_oid;
    d = (Datum) heap_getattr(plattup, buf, Anum_pg_platter_plname,
			     platdesc, &isnull);
    platname = DatumGetName(d);
    strncpy(plname, &(platname->data[0]), sizeof(NameData));
    plname[sizeof(NameData)] = '\0';
a776 5
    /* done */
    ReleaseBuffer(buf);
    heap_endscan(platscan);
    heap_close(plat);

d783 6
@


1.32
log
@pass address of spinlock, not spinlock itself
@
text
@d39 1
a39 1
RcsId("$Header: /n/hermes/usr5/postgres/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.31 1991/11/14 19:40:44 kemnitz Exp mao $");
d691 12
a702 2
 *	For now, this makes a really stupid choice.  Need to think about
 *	the right way to go about this.
@


1.31
log
@protos checkin.
@
text
@d39 1
a39 1
RcsId("$Header: RCS/sj.c,v 1.30 91/11/08 20:18:35 mao Exp Locker: kemnitz $");
d89 1
a89 1
    S_LOCK(item->sjc_iolock);
@


1.30
log
@file mode on open is 0600
@
text
@d39 1
a39 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.29 1991/11/07 06:05:53 mao Exp mao $");
a126 1
extern int	tag_hash();
d326 1
a326 1
    nread = FileRead(SJMetaVfd, SJCache, nbytes);
d1520 1
a1520 1
    entry = (SJHashEntry *) hash_search(SJCacheHT, tagP, op, foundP);
d1646 1
a1646 1
    if (FileRead(SJCacheVfd, &gdesc, sizeof(gdesc)) < 0) {
d1823 1
a1823 1
    while ((nbytes = FileRead(SJBlockVfd, &mytag, sizeof(mytag))) > 0) {
d1888 1
a1888 1
    while (FileRead(SJBlockVfd, &mytag, sizeof(mytag)) > 0) {
@


1.29
log
@at initialization time, free list head points to first unallocated
extent in the cache.  also, walk backwards along free list when computing
first unallocated extent on a platter.
@
text
@d39 1
a39 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.28 1991/10/29 06:34:27 mao Exp mao $");
d1555 1
a1555 1
    fd = FileNameOpenFile(path, O_RDWR, 0666);
@


1.28
log
@remove debugging code
@
text
@d39 1
a39 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.27 1991/10/29 06:33:22 mao Exp mao $");
d342 1
a342 6
    /*
     *  Add every group that appears in the cache to the hash table.  Since
     *  we have no references to any of these groups yet, they all appear on
     *  the free list.
     */

a349 8
	/* link up free list -- no info yet, so just link groups in order */
	cur->sjc_freeprev = i - 1;
	if (i == SJCACHESIZE - 1) {
	    cur->sjc_freenext = -1;
	} else {
	    cur->sjc_freenext = i + 1;
	}

d360 7
a366 2
     *  Put the rest of the cache entries on the free list, marking them as
     *  missing by setting the oid entry to InvalidObjectId.
d369 1
a369 1
    for (i = nentries; i < SJCACHESIZE; i++) {
a370 1
	cur->sjc_oid = InvalidObjectId;
d372 1
d378 39
a420 2
    SJHeader->sjh_freehead = 0;
    SJHeader->sjh_freetail = SJCACHESIZE - 1;
d2005 9
a2013 1
    for (i = 0; i < SJHeader->sjh_nentries; i++) {
@


1.27
log
@be polite -- allocate memory before you write on it.
@
text
@d39 1
a39 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.26 1991/10/29 04:12:35 mao Exp $");
a52 2
static SJCacheTag	*DebugBlockEnd;	/* pointer to nblock cache */
int			*MaoDebugInt = 0x10072000;
a194 1
    DebugBlockEnd = (SJCacheTag *) cacheblk;
a1805 1
	    if (cachetag >= DebugBlockEnd) _punt();
a1846 1
    if (cachetag >= DebugBlockEnd) _punt();
a2083 5

_punt()
{
	elog(NOTICE, "found it");
}
@


1.26
log
@fix nblock cache code -- compilation failed
@
text
@d39 1
a39 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.25 1991/10/29 00:11:52 mao Exp $");
d53 2
d165 2
a166 1
    metasize = (SJCACHESIZE * sizeof(SJCacheItem)) + sizeof(SJCacheHeader);
d169 1
a169 1
		+ sizeof(*SJNWaiting);
d197 1
d1809 1
d1835 1
a1835 1
    cachetag = &(SJNBlockCache[0]);
d1851 1
d2089 5
@


1.25
log
@fix up some botches in cache management
@
text
@d39 1
a39 1
RcsId("$Header: RCS/sj.c,v 1.24 91/10/04 17:52:59 mao Exp Locker: mao $");
d1782 2
a1783 2
	if (cachetag->sjct_dbid == tag.sjct_dbid
	    && cachetag->sjct_relid == tag.sjct_relid) {
d1830 1
a1830 1
    cachetag = SJNBlockCache;
d1833 2
a1834 2
	if (cachetag->sjct_dbid == tag.sjct_dbid
	    && cachetag->sjct_relid == tag.sjct_relid)
d1842 2
a1843 2
	i = tag.sjct_relid % SJNBLKSIZE;
	cachetag = &(SJCacheTag[i]);
d1846 3
a1848 3
    cachetag->sjct_dbid = tag.sjct_dbid;
    cachetag->sjct_relid = tag.sjct_relid;
    cachetag->sjct_base = tag.sjct_base;
@


1.24
log
@starting to optimize -- cut way back on the amount of io we do for reads
and writes on the mag disk cache.
@
text
@d39 1
a39 1
RcsId("$Header: /local/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.23 1991/10/03 15:07:32 mao Exp mao $");
d52 1
d179 1
a179 1
     *  and cache entries.
d190 3
d337 5
d1775 2
d1779 11
d1797 12
d1826 2
d1830 20
d1922 3
@


1.23
log
@on create, release the cache lock before returning.  oops.
@
text
@d39 1
a39 1
RcsId("$Header: /local/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.22 1991/10/03 00:56:55 mao Exp $");
d115 1
d1212 1
d1269 16
a1284 1
    if (_sjreadgrp(item, grpno) == SM_FAIL) {
d1289 6
a1294 2
    offset = (grpoffset * BLCKSZ) + JBBLOCKSZ;
    bcopy(buffer, &(SJCacheBuf[offset]), BLCKSZ);
d1296 2
a1297 9
    /*
     *  It's the highest-numbered block in this relation, and it's not on
     *  the platter yet.
     *
     *  NOTE:  by doing this, we've just changed the number of blocks in the
     *  relation.  We need to hold the extend lock on this reln until end
     *  of transaction, since no one will be able to see the new block until
     *  then.
     */
d1299 6
a1304 1
    item->sjc_flags[grpoffset] |= SJC_CLEAR;
d1306 2
a1307 2
    /* finally, write out the extent with the new block in it */
    if (_sjwritegrp(item, grpno) == SM_FAIL) {
d1309 1
d1313 1
d1541 1
d1563 13
a1575 1
    if (_sjreadgrp(item, grpno) == SM_FAIL) {
d1580 6
a1585 2
    offset = ((blocknum % SJGRPSIZE) * BLCKSZ) + JBBLOCKSZ;
    bcopy(&(SJCacheBuf[offset]), buffer, BLCKSZ);
d1593 31
d1636 1
d1658 2
d1662 16
a1677 1
    SET_IO_LOCK(item);
d1679 2
a1680 1
    if (_sjreadgrp(item, grpno) == SM_FAIL) {
d1686 9
a1694 2
    offset = (which * BLCKSZ) + JBBLOCKSZ;
    bcopy(buffer, &(SJCacheBuf[offset]), BLCKSZ);
d1696 2
a1697 1
    if (_sjwritegrp(item, grpno) == SM_FAIL) {
@


1.22
log
@cleanup and bug fixes -- wisconsin benchmark now works for jukebox
relations
@
text
@d39 1
a39 1
RcsId("$Header: /local/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.21 1991/09/28 20:04:03 mao Exp mao $");
d556 1
d559 3
a561 4
     *  By here, cache is initialized and we have exclusive access to
     *  metadata.  We are aggressively lazy, and will not allocate an
     *  initial extent for this relation until it's actually used.  We
     *  just register an initial block count of zero.
@


1.21
log
@checking in in order to sync up and get a new tree; this version fixes
many bugs, but still contains a bunch of debugging code, and should not
be shipped.
@
text
@d39 1
a39 1
RcsId("$Header: /local/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.20 1991/09/11 07:19:37 mao Exp mao $");
d132 6
a137 8
 *	in shared memory and open the cache on mag disk.  If this code is
 *	executed by the postmaster, we'll create (but not populate) the
 *	cache memory.  The first backend to run that touches the cache
 *	initializes it.  All other backends running simultaneously will
 *	only wait for this initialization to complete if they need to get
 *	data out of the cache.  Otherwise, they'll return successfully
 *	immediately after attaching the cache memory, and will let their
 *	older sibling do all the work.
d155 1
a155 1
     *  the 
d800 8
a807 1
    /* if there are no writes to force to the jukebox, we're done */
d809 9
a817 5
    if (!(item->sjc_gflags & SJC_DIRTY)) {
	for (i = 0; i < SJGRPSIZE; i++) {
	    if (item->sjc_flags[i] & SJC_DIRTY) {
		dirty = true;
		break;
a819 2
    } else {
	dirty = true;
d851 1
a851 1
	if (item->sjc_flags[i] & SJC_DIRTY) {
d859 1
a859 17
    /* if necessary, put the highest block in the relation on mag disk */
    if ((item->sjc_tag.sjct_base + SJGRPSIZE + 1) >= nblocks) {
	grpoffset = ((nblocks - 1) % SJGRPSIZE);

	if (item->sjc_flags[grpoffset] & SJC_DIRTY) {

	    /* COMPLETELY bogus.  Won't work with any sort of sharing. */
	    reln = RelationIdGetRelation(item->sjc_tag.sjct_relid);
	    loc = FileSeek(reln->rd_fd, 0L, L_SET);
	    where = JBBLOCKSZ + ((nblocks - 1) * BLCKSZ);
	    FileWrite(reln->rd_fd, &(SJCacheBuf[where]), BLCKSZ);

	    item->sjc_flags[grpoffset] &= ~SJC_DIRTY;
	}
    }

    if (pgjb_wrtextent(item, &(SJCacheBuf[0])) == SM_FAIL) {
a1056 15

    /* XXX debug */
    {
	int i;
	char *p;

	p = &(SJCacheBuf[JBBLOCKSZ]);

	for (i = 0; i < SJGRPSIZE; i++) {
	    if (!(item->sjc_flags[i] & SJC_MISSING))
		_sjbuftrap(item->sjc_tag.sjct_base + i, p);

	    p += BLCKSZ;
	}
    }
a1177 15
    /* XXX debug */
    {
	int i;
	char *p;

	p = &(SJCacheBuf[JBBLOCKSZ]);

	for (i = 0; i < SJGRPSIZE; i++) {
	    if (!(item->sjc_flags[i] & SJC_MISSING))
		_sjbuftrap(item->sjc_tag.sjct_base + i, p);

	    p += BLCKSZ;
	}
    }

a1253 5
     *
     *  The check of DIRTY and ONPLATTER in case of not MISSING is to handle
     *  the case where some other backend started to do the extend, then
     *  aborted.  In fact, this is probably an error, and the code to handle
     *  it may not work correctly; should think more about this.
d1257 2
a1258 5
	if (item->sjc_flags[grpoffset] & SJC_DIRTY
	    || item->sjc_flags[grpoffset] & SJC_ONPLATTER) {
	    SpinRelease(SJCacheLock);
	    elog(WARN, "sjextend: cache botch: next block in group present");
	}
d1276 2
a1277 14
     *  It's the highest-numbered block in this relation, and it's dirty,
     *  now.  NOTE:  by doing this, we've just changed the number of blocks
     *  in the relation.  We need to hold the extend lock on this reln
     *  until end of transaction, since no one will be able to see the new
     *  block until then.
     */

    item->sjc_flags[grpoffset] |= SJC_DIRTY;

    /*
     *  Since we just added a new block to the relation, the old highest-
     *  numbered block is about to become a candidate for movement to the
     *  optical disk jukebox.  Until now, it's been cached on magnetic
     *  disk.  We need to mark it dirty.
d1279 4
a1282 5
     *  There are two possibilities:  if the old block is in the same
     *  extent as the new block, then we can just mark it dirty directly,
     *  since we have that group already.  If this is a brand-new extent,
     *  then we need to instantiate the extent that precedes it, and mark
     *  the highest-numbered block in that extent dirty.
d1285 1
a1285 24
    if (grpoffset == 0) {

	/*
	 *  Hard case -- we just allocated a new extent.  We need to
	 *  instantiate the previous extent and mark the block dirty
	 *  there.  This is complicated enough to wrap up in a separate
	 *  routine.
	 */

	if (nblocks > 0)
	    _sjdirtylast(tag.sjct_dbid, tag.sjct_relid, nblocks - 1);

    } else {

	/*
	 *  Easy case -- old block is in this extent.  Decrement the
	 *  offset and mark the block dirty.  It is bad news if the
	 *  old highest-numbered block is on a platter or missing; these
	 *  should never happen.
	 */

	grpoffset--;
	if (item->sjc_flags[grpoffset] & SJC_MISSING
	    || item->sjc_flags[grpoffset] & SJC_ONPLATTER) {
a1286 7
	    elog(WARN, "sjextend: old 'last block' not writable");
	}

	/* okay, mark it dirty */
	item->sjc_flags[grpoffset] |= SJC_DIRTY;
    }

a1301 76
/*
 *  _sjdirtyblock() -- Mark the requested block in a relation dirty.
 *
 *	When we extend a relation, it gets a new last block.  The last
 *	block of every relation is always stored on magnetic disk, so
 *	when we do an extend, we need to mark the old last block dirty.
 *	This will guarantee that it gets kicked out to the optical
 *	platter later, and that the new last block can be safely written
 *	to the magnetic disk file for caching the relation's last block.
 */

static void
_sjdirtylast(dbid, relid, blkno)
    ObjectId dbid;
    ObjectId relid;
    int blkno;
{
    OffsetNumber base;
    int grpno;
    int i;
    long seekpos;
    long loc;
    int nbytes;
    char *buf;
    int which;
    SJCacheItem *item;

    base = ((blkno / SJGRPSIZE) * SJGRPSIZE);
    which = (blkno % SJGRPSIZE);
    item = _sjfetchgrp(dbid, relid, base, &grpno);

    SpinAcquire(SJCacheLock);
    SET_IO_LOCK(item);

    /* mark it dirty */
    if ((item->sjc_flags[which] & SJC_MISSING)
	|| (item->sjc_flags[which] & SJC_ONPLATTER)) {

	_sjunwait_io(item);
	_sjunpin(item);

	elog(WARN, "_sjdirtyblock: old 'last block' not writable");
    }

    item->sjc_flags[which] |= SJC_DIRTY;

    /* just need to update the metadata file */
    seekpos = grpno * sizeof(*item);

    if ((loc = FileSeek(SJMetaVfd, seekpos, L_SET)) != seekpos) {
	_sjunwait_io(item);
	_sjunpin(item);
	elog(WARN, "_sjdirtyblock: cache metadata file seek failed");
    }

    nbytes = sizeof(*item);
    buf = (char *) item;
    while (nbytes > 0) {
	i = FileWrite(SJMetaVfd, buf, nbytes);

	if (i < 0) {
	    _sjunwait_io(item);
	    _sjunpin(item);
	    elog(WARN, "_sjdirtyblock: cache metadata file write failed");
	}

	nbytes -= i;
	buf += i;
    }

    _sjunwait_io(item);
    _sjunpin(item);

    FileSync(SJMetaVfd);
}

a1341 15
    /* XXX debug */
    {
	int i;
	char *p;

	p = &(SJCacheBuf[JBBLOCKSZ]);

	for (i = 0; i < SJGRPSIZE; i++) {
	    if (!(item->sjc_flags[i] & SJC_MISSING))
		_sjbuftrap(item->sjc_tag.sjct_base + i, p);

	    p += BLCKSZ;
	}
    }

d1385 1
a1385 1
    item->sjc_gflags = SJC_DIRTY;
a1548 2
    _sjbuftrap(blocknum, buffer);

a1567 2
    _sjbuftrap(blocknum, buffer);

a1588 1
    item->sjc_flags[which] |= SJC_DIRTY;
d1647 1
d1656 1
a1656 1
    _sjfindnblocks(&tag);
d1679 1
a1679 2
	    tag->sjct_base = mytag.sjct_base;
	    return;
d1894 1
a1894 2
	    printf("[%d %c%c%c]", j,
	    	   (item->sjc_flags[j] & SJC_DIRTY ? 'd' : '-'),
a1926 21
#include "storage/bufpage.h"

_sjbuftrap(blkno, page)
    OffsetNumber blkno;
    Page page;
{
    HeapTuple htup;

    if (PageIsEmpty(page) || ((PageHeader) page)->pd_lower == 0)
	return;

    htup = (HeapTuple) PageGetItem(page, PageGetItemId(page, 0));

    if (ItemPointerGetBlockNumber(&(htup->t_ctid)) != blkno)
	_sjtrap();
}

_sjtrap()
{
    elog(NOTICE, "got that puppy");
}
@


1.20
log
@flushes to platters sort of working; sometimes we get a small hole in
an extent.  need to try to figure out what is going on in pgjb_wrtextent,
in the case where we parcel up the write into pieces.
@
text
@d39 1
a39 1
RcsId("$Header: /local/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.19 1991/09/10 23:27:19 mao Exp mao $");
d111 1
d850 2
d853 2
a854 2
    if ((item->sjc_tag.sjct_base + SJGRPSIZE) >= nblocks) {
	grpoffset = (nblocks % SJGRPSIZE) - 1;
d907 1
a907 1
	    return (_sjfetchgrp(dbid, relid, blkno));
d1066 15
d1202 4
a1205 9
    nbytes = SJBUFSIZE;
    buf = &(SJCacheBuf[0]);
    while (nbytes > 0) {
	i = FileWrite(SJCacheVfd, buf, nbytes);
	if (i < 0)
	    return (SM_FAIL);
	nbytes -= i;
	buf += i;
    }
d1207 1
a1207 1
    FileSync(SJCacheVfd);
d1209 3
a1211 2
    return (SM_SUCCESS);
}
d1213 2
a1214 16
static int
_sjreadgrp(item, grpno)
    SJCacheItem *item;
    int grpno;
{
    long seekpos;
    long loc;
    int nbytes, i;
    char *buf;
    SJGroupDesc *gdesc;

    /* get the group from the cache file */
    seekpos = grpno * SJBUFSIZE;
    if ((loc = FileSeek(SJCacheVfd, seekpos, L_SET)) != seekpos) {
	elog(NOTICE, "_sjreadgrp: cannot seek");
	return (SM_FAIL);
d1220 2
a1221 3
	i = FileRead(SJCacheVfd, buf, nbytes);
	if (i < 0) {
	    elog(NOTICE, "_sjreadgrp: read failed");
a1222 1
	}
d1227 1
a1227 9
    gdesc = (SJGroupDesc *) &(SJCacheBuf[0]);

    if (gdesc->sjgd_magic != SJGDMAGIC
	|| gdesc->sjgd_version != SJGDVERSION
	|| gdesc->sjgd_groupoid != item->sjc_oid) {

	elog(NOTICE, "_sjreadgrp: trashed cache");
	return (SM_FAIL);
    }
a1231 7
int
sjunlink(reln)
    Relation reln;
{
    return (SM_FAIL);
}

d1329 1
d1332 46
d1393 141
d1730 2
d1751 2
d2114 21
@


1.19
log
@get rid of some curiosities in the block and extent allocation code.
we are now agressively lazy, and will not allocate even the initial
extent for a relation until it's needed.  this simplifies the code a
lot.
@
text
@d39 1
a39 1
RcsId("$Header: /local/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.18 1991/09/10 06:41:50 mao Exp $");
d732 4
d737 2
a738 2
     *  If the cache is full, we call a routine to get rid of the least
     *  recently used group.
d741 1
a741 4
    if (SJHeader->sjh_nentries == SJCACHESIZE)
	elog(FATAL, "_sjallocgrp:  no groups on free list!");
    else
	*grpno = _sjgetgrp();
a744 3
    /* bump ref count */
    _sjtouch(item);

d759 13
d787 4
d832 17
a849 2
    nblocks = _sjfindnblocks(&(item->sjc_tag));

d954 1
a954 1
	    /* read the extent */
d957 4
a960 1
	    /* release IO lock */
d1000 1
a1000 1
    ScanKeyEntryInitialize(&skey[0], 0x0, Anum_pg_plmap_plblkno,
d1234 1
d1249 4
d1419 1
a1419 1
    group->sjgd_relblkno = 0;
@


1.18
log
@work on cache management -- allocate extents properly
@
text
@d39 1
a39 1
RcsId("$Header: /local/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.17 1991/09/09 23:58:55 mao Exp mao $");
d532 3
d560 3
a562 1
     *  metadata.  Allocate an initial (empty) extent in the cache.
d565 9
a573 1
    _sjnewextent(reln, 0);
d972 8
d981 4
a984 4
	_sjunwait_io(item);
	elog(WARN, "_sjrdextent: cannot find <%d,%d,%d>",
		   item->sjc_tag.sjct_dbid, item->sjc_tag.sjct_relid,
		   item->sjc_tag.sjct_base);
d1130 1
a1226 1
    int blkno;
d1242 1
a1242 2
    if (((nblocks + 1) % SJGRPSIZE) == 0) {
	base += SJGRPSIZE;
d1262 1
d1265 1
a1278 1
    grpoffset = nblocks % SJGRPSIZE;
d1298 1
a1298 1
    offset = ((blkno % SJGRPSIZE) * BLCKSZ) + JBBLOCKSZ;
d1492 6
@


1.17
log
@use GetPGHome instead of using getenv directly
@
text
@d39 1
a39 1
RcsId("$Header: /local/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.16 1991/09/05 23:26:02 hong Exp mao $");
d1791 1
a1791 1
		if (group->sjgd_jboffset > last)
d1796 3
a1798 1
		if (SJCache[i].sjc_jboffset > last)
d1800 1
@


1.16
log
@fix a bug in shared memory size calculation
@
text
@a7 2
#include <sys/file.h>

d13 1
d17 2
d39 1
a39 1
RcsId("$Header: RCS/sj.c,v 1.15 91/08/22 06:33:09 mao Exp Locker: mao $");
a124 1
extern char	*getenv();
d231 1
a231 3
    if ((pghome = getenv("POSTGRESHOME")) == (char *) NULL)
	pghome = "/usr/postgres";

@


1.15
log
@bug fixes to code that handles flushing, fetching bytes from the
jukebox
@
text
@d15 1
d38 1
a38 1
RcsId("$Header: /local/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.14 1991/08/13 22:00:30 mao Exp $");
d1702 1
d1713 5
a1717 4
    size += my_log2(SJCACHESIZE) + sizeof(HHDR)
          + nsegs * DEF_SEGSIZE * sizeof(SEGMENT)
          + (int)ceil((double)SJCACHESIZE/BUCKET_ALLOC_INCR)*BUCKET_ALLOC_INCR*
             (sizeof(BUCKET_INDEX) + sizeof(SJHashEntry));
@


1.14
log
@separate routine now initializes jukebox wait semaphore; this is to
permit the postmaster to find this semaphore on shutdown
@
text
@d37 1
a37 1
RcsId("$Header: /local/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.13 1991/08/08 05:53:28 mao Exp $");
a49 1
static SJNBlock		*SJNBlockList;	/* linked list of nblocks by relid */
a281 3
    /* haven't computed block counts for any relations yet */
    SJNBlockList = (SJNBlock *) NULL;

d382 1
a382 1
    SJHeader->sjh_freetail = SJCACHESIZE;
d694 1
a694 1
    bcopy(&(item->sjc_plname.data[0]), plname, sizeof(NameData));
d806 1
a806 1
    nblocks = sjnblocks(&(item->sjc_tag));
d906 1
a906 1
	    bcopy((char *) &(item->sjc_tag), (char *) &tag, sizeof(tag));
a1213 8
    if (reln->rd_rel->relisshared)
	tag.sjct_dbid = (ObjectId) 0;
    else
	tag.sjct_dbid = MyDatabaseId;

    tag.sjct_relid = reln->rd_id;
    tag.sjct_base = base;

d1223 1
d1228 8
d1469 1
d1478 1
a1478 1
    item = _sjfetchgrp(reldbid, reln->rd_id, blocknum / SJGRPSIZE, &grpno);
d1480 2
d1509 1
d1519 3
a1521 1
    item = _sjfetchgrp(reldbid, reln->rd_id, blocknum / SJGRPSIZE, &grpno);
a1614 1
    SJNBlock *l;
a1617 11
    /* see if we already computed the block count */
    l = SJNBlockList;

    while (l != (SJNBlock *) NULL) {
	if (l->sjnb_relid == tag->sjct_relid && l->sjnb_dbid == tag->sjct_dbid)
	    return (l->sjnb_nblocks);

	l = l->sjnb_next;
    }

    /* nope, need to do some work */
a1642 1
    SJNBlock *l;
a1644 23
    l = SJNBlockList;

    /* overwrite old value, if one exists */
    while (l != (SJNBlock *) NULL) {

	if (l->sjnb_relid == tag->sjct_relid
	    && l->sjnb_dbid == tag->sjct_dbid) {
	    l->sjnb_nblocks = (int) tag->sjct_base;
	    break;
	}
	l = l->sjnb_next;
    }

    /* otherwise, allocate new slot and write new value */
    if (l == (SJNBlock *) NULL) {
	l = (SJNBlock *) palloc(sizeof(SJNBlock));
	l->sjnb_relid = tag->sjct_relid;
	l->sjnb_dbid = tag->sjct_dbid;
	l->sjnb_nblocks = (int) tag->sjct_base;
	l->sjnb_next = SJNBlockList;
	SJNBlockList = l;
    }

d1659 1
a1659 1
	    if (FileWrite(SJBlockVfd, (char *) &mytag, sizeof(mytag)) < 0)
d1667 1
a1667 4
    mytag.sjct_dbid = tag->sjct_dbid;
    mytag.sjct_relid = tag->sjct_relid;

    if (FileWrite(SJBlockVfd, (char *) &mytag, sizeof(mytag)) < 0)
d1673 3
a1675 2
    /* XXX should free the list, but it's in the wrong mcxt */
    SJNBlockList = (SJNBlock *) NULL;
a1682 3
    /* XXX should free the list, but it's in the wrong mcxt */
    SJNBlockList = (SJNBlock *) NULL;

@


1.13
log
@simple jukebox interactions work correctly.
@
text
@d37 1
a37 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.12 1991/08/06 08:09:21 mao Exp mao $");
a138 4
 *
 *	The 'key' argument is the IPC key used in this backend (or postmaster)
 *	for initializing shared memory and semaphores.  Since we need a
 *	wait lock, we need this.
d142 1
a142 2
sjinit(key)
    IPCKey key;
a207 14
#ifndef HAS_TEST_AND_SET
    /*
     *  Finally, we need the wait semaphore if this system does not support
     *  test-and-set locks.
     */

    SJWaitSemId = IpcSemaphoreCreate(IPCKeyGetSJWaitSemaphoreKey(key),
				     1, IPCProtection, 0, &status);
    if (SJWaitSemId < 0) {
	SpinRelease(SJCacheLock);
	return (SM_FAIL);
    }
#endif /* ndef HAS_TEST_AND_SET */

d1883 23
@


1.12
log
@MyDatabaseName, MyDatabaseId are extern, not static
@
text
@d37 1
a37 1
RcsId("$Header: RCS/sj.c,v 1.11 91/08/06 01:41:44 mao Exp Locker: mao $");
d1719 3
@


1.11
log
@real jukebox support is in, but is untested (initialization still works)
@
text
@d37 1
a37 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.10 1991/08/03 00:29:18 mao Exp mao $");
d41 2
a42 2
static ObjectId		MyDatabaseId;	/* OID of database we have open */
static Name		MyDatabaseName;	/* name of database we have open */
@


1.10
log
@add (some) real jukebox calls
@
text
@d27 1
d37 1
a37 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.9 1991/07/29 16:52:28 mer Exp mao $");
d46 1
d71 2
a72 2
/* static buffer is for data transfer -- SJGRPSIZE blocks + descriptor block */
static char	SJCacheBuf[(BLCKSZ * SJGRPSIZE) + JBBLOCKSZ];
d74 21
a94 2
/* used in sj.c, pgjb.c */
int		SJBufSize = ((BLCKSZ * SJGRPSIZE) + JBBLOCKSZ);
d96 2
d108 2
d116 2
d125 1
d286 16
d356 6
a361 1
    /* add each entry to the hash table, and set up link pointers */
d364 1
a364 27
	result = (SJHashEntry *) hash_search(SJCacheHT, &(cur->sjc_tag),
					     HASH_ENTER, &found);

	/*
	 *  If the hash table is corrupted, or the entry is already in the
	 *  table, then we're in trouble and need to surrender.  When we
	 *  release our initialization lock on the cache metadata, someone
	 *  else may come along later and try to reinitialize it.  They'll
	 *  fail, too, since we leave things trashed here.  Rather than try
	 *  to clean up, however, we assume that failing fast is the right
	 *  answer.  Since this is catastrophic, other backends probably
	 *  *should* fail.
	 */

	if (result == (SJHashEntry *) NULL) {
	    SJHeader->sjh_flags &= ~SJH_INITING;
	    _sjunwait_init();
	    elog(FATAL, "sj cache hash table corrupted");
	}

	if (found) {
	    SJHeader->sjh_flags &= ~SJH_INITING;
	    _sjunwait_init();
	    elog(FATAL, "duplicate group in sj cache file: <%d,%d,%d>",
		 cur->sjc_tag.sjct_dbid, cur->sjc_tag.sjct_relid, 
		 cur->sjc_tag.sjct_base);
	}
d369 7
a375 6
	/* link up lru list -- no info yet, so just link groups in order */
	cur->sjc_lruprev = i - 1;
	if (i == nentries - 1)
	    cur->sjc_lrunext = -1;
	else
	    cur->sjc_lrunext = i + 1;
d386 4
a389 2
    /* set up cache metadata header struct */
    SJHeader->sjh_nentries = nentries;
d391 10
a400 4
    if (nentries > 0)
	SJHeader->sjh_lruhead = 0;
    else
	SJHeader->sjh_lruhead = -1;
d402 4
a405 1
    SJHeader->sjh_lrutail = nentries - 1;
d446 1
a446 1
 *  sjunwait_io() -- Release IO lock on the jukebox cache.
d455 2
a456 2
void
sjunwait_io(item)
a555 1
    SJHashEntry *entry;
a557 1
    bool found;
d581 1
a581 1
     *  metadata.  Allocate a group in the cache.
d584 1
a584 1
    item = _sjallocgrp(&grpno);
d586 1
a587 89
	item->sjc_tag.sjct_dbid = (ObjectId) 0;
    else
	item->sjc_tag.sjct_dbid = MyDatabaseId;

    item->sjc_tag.sjct_relid = (ObjectId) reln->rd_id;
    item->sjc_tag.sjct_base = (BlockNumber) 0;

    entry = (SJHashEntry *) hash_search(SJCacheHT, item, HASH_ENTER, &found);

    if (entry == (SJHashEntry *) NULL) {
	SpinRelease(SJCacheLock);
	elog(FATAL, "jukebox cache hash table corrupt.");
    } else if (found) {
	SpinRelease(SJCacheLock);
	elog(FATAL, "Attempt to create existing relation -- impossible");
    }

    entry->sjhe_groupno = grpno;
    item->sjc_gflags |= SJC_IOINPROG;
#ifdef HAS_TEST_AND_SET
    SpinRelease(SJCacheLock);
    S_LOCK(item->sjc_iolock);
#else /* HAS_TEST_AND_SET */
    (*SJNWaiting)++;
    SpinRelease(SJCacheLock);
    IpcSemaphoreLock(SJWaitSemId, 0, 1);
#endif /* HAS_TEST_AND_SET */

    /* set flags on item, initialize group descriptor block */
    item->sjc_gflags = SJC_DIRTY;
    for (i = 0; i < SJGRPSIZE; i++)
	item->sjc_flags[i] = SJC_MISSING;

    /* should be smarter and only bzero what we need to */
    bzero(SJCacheBuf, SJBufSize);

    group = (SJGroupDesc *) (&SJCacheBuf[0]);
    group->sjgd_magic = SJGDMAGIC;
    group->sjgd_version = SJGDVERSION;

    if (reln->rd_rel->relisshared) {
	group->sjgd_dbid = (ObjectId) 0;
    } else {
	strncpy(&(group->sjgd_dbname.data[0]),
		&(MyDatabaseName->data[0]),
		sizeof(NameData));
	group->sjgd_dbid = (ObjectId) MyDatabaseId;
    }

    strncpy(&(group->sjgd_relname.data[0]),
	    &(reln->rd_rel->relname.data[0]),
	    sizeof(NameData));
    group->sjgd_relid = reln->rd_id;
    group->sjgd_relblkno = 0;
    item->sjc_oid = group->sjgd_groupoid = newoid();

    /*
     *  Record the presence of the new extent in the system catalogs.  The
     *  plid, jboffset, and extentsz fields are filled in by _sjregister()
     *  or the routines that it calls.  Note that we do not force the new
     *  group descriptor block all the way to the optical platter here.
     *  We do decide where to place it, however, and must go to a fair amount
     *  of trouble elsewhere in the code to avoid allocating the same extent
     *  to a different relation, or block within the same relation.
     */

    _sjregister(item, group);

    /*
     *  Write the new group cache entry to disk.  Sjwritegrp() knows where
     *  the cache buffer begins, and forces out the group descriptor we
     *  just set up.
     */

    if (_sjwritegrp(item, grpno) == SM_FAIL) {
	sjunwait_io(item);
	return (-1);
    }

    _sjregnblocks(reln->rd_id, 0);

    /* can now release i/o lock on the item we just added */
    sjunwait_io(item);

    /* no longer need the reference */
    _sjunpin(item);

    /* last thing to do is to create the mag-disk file to hold last page */
    if (group->sjgd_dbid == (ObjectId) 0)
d734 1
a734 1
 *	group we're kicking out, if indeed we're doing that.
d743 63
a805 4
    /* see if we can avoid doing any work here */
    if (SJHeader->sjh_nentries < SJCACHESIZE) {
	*grpno = SJHeader->sjh_nentries;
	SJHeader->sjh_nentries++;
d807 19
a825 2
	/* XXX here, need to kick someone out */
	elog(FATAL, "hey mao, your cache appears to be full.");
d828 17
a844 1
    item = &SJCache[*grpno];
d846 3
a848 8
    item->sjc_lruprev = -1;
    item->sjc_lrunext = SJHeader->sjh_lruhead;
    if (SJHeader->sjh_lruhead == -1) {
	SJHeader->sjh_lruhead = *grpno;
	SJHeader->sjh_lrutail = *grpno;
    } else {
	SJCache[SJHeader->sjh_lruhead].sjc_lruprev = *grpno;
	SJHeader->sjh_lruhead = *grpno;
d851 4
a854 2
    /* bump ref count */
    _sjtouch(item);
d856 1
a856 1
    return (item);
d877 1
a877 6
    entry = (SJHashEntry *) hash_search(SJCacheHT, &tag, HASH_FIND, &found);

    if (entry == (SJHashEntry *) NULL) {
	SpinRelease(SJCacheLock);
	elog(FATAL, "_sjfetchgrp: hash table corrupted");
    }
d888 1
a888 1
	_sjtouch(item, *grpno);
d892 49
a940 3
	SpinRelease(SJCacheLock);
	elog(FATAL, "_sjfetchgrp: hey mao: can't find <%d,%d,%d>",
		    dbid, relid, blkno);
d946 97
d1044 1
a1044 1
_sjtouch(item, grpno)
a1045 1
    int grpno;
d1047 6
a1052 2
    /* first bump the ref count */
    (item->sjc_refcount)++;
d1054 5
a1058 3
    /* now move it to the top of the lru list */
    if (item->sjc_lruprev == -1)
	return;
d1060 5
a1064 4
    if (item->sjc_lrunext == -1)
	SJHeader->sjh_lrutail = item->sjc_lruprev;
    else
	SJCache[item->sjc_lrunext].sjc_lruprev = item->sjc_lruprev;
d1066 2
a1067 1
    SJCache[item->sjc_lruprev].sjc_lrunext = item->sjc_lrunext;
d1069 3
a1071 3
    item->sjc_lruprev = -1;
    item->sjc_lrunext = SJHeader->sjh_lruhead;
    SJHeader->sjh_lruhead = grpno;
d1074 9
d1087 3
d1091 2
d1095 23
a1117 1
    (item->sjc_refcount)--;
d1149 1
a1149 1
    seekpos = grpno * SJBufSize;
d1153 1
a1153 1
    nbytes = SJBufSize;
d1180 1
a1180 1
    seekpos = grpno * SJBufSize;
d1186 1
a1186 1
    nbytes = SJBufSize;
d1231 1
d1235 1
a1235 1
    base = nblocks / SJGRPSIZE;
d1247 5
a1251 1
    entry = (SJHashEntry *) hash_search(SJCacheHT, &tag, HASH_FIND, &found);
d1253 3
a1255 3
    if (entry == (SJHashEntry *) NULL) {
	SpinRelease(SJCacheLock);
	elog(FATAL, "sjextend: cache hash table corrupted");
d1258 2
d1268 11
a1278 1
    _sjtouch(item, grpno);
d1280 6
a1285 5
    for (blkno = 0; blkno < SJGRPSIZE; blkno++) {
	if (item->sjc_flags[blkno] & SJC_MISSING) {
	    item->sjc_flags[blkno] &= ~SJC_MISSING;
	    item->sjc_flags[blkno] |= SJC_DIRTY;
	    break;
d1287 2
d1291 1
a1291 6
    if (blkno == SJGRPSIZE) {
	SpinRelease(SJCacheLock);
	elog(WARN, "sjextend:  hey mao:  no missing blocks to extend");
    }

    item->sjc_gflags |= SJC_IOINPROG;
d1293 1
a1293 8
#ifdef HAS_TEST_AND_SET
    SpinRelease(SJCacheLock);
    S_LOCK(item->sjc_iolock);
#else /* HAS_TEST_AND_SET */
    (*SJNWaiting)++;
    SpinRelease(SJCacheLock);
    IpcSemaphoreLock(SJWaitSemId, 0, 1);
#endif /* HAS_TEST_AND_SET */
d1296 1
a1296 1
	sjunwait_io(item);
d1303 9
d1313 1
a1313 1
	sjunwait_io(item);
d1317 1
a1317 1
    sjunwait_io(item);
d1320 2
a1321 1
    _sjregnblocks(reln->rd_id, ++nblocks);
d1326 133
d1504 1
a1504 10
    item->sjc_gflags |= SJC_IOINPROG;

#ifdef HAS_TEST_AND_SET
    SpinRelease(SJCacheLock);
    S_LOCK(item->sjc_iolock);
#else /* HAS_TEST_AND_SET */
    (*SJNWaiting)++;
    SpinRelease(SJCacheLock);
    IpcSemaphoreLock(SJWaitSemId, 0, 1);
#endif /* HAS_TEST_AND_SET */
d1507 1
a1507 1
	sjunwait_io(item);
d1514 1
a1514 1
    sjunwait_io(item);
a1551 1
    item->sjc_gflags |= SJC_IOINPROG;
d1553 1
a1553 8
#ifdef HAS_TEST_AND_SET
    SpinRelease(SJCacheLock);
    S_LOCK(item->sjc_iolock);
#else /* HAS_TEST_AND_SET */
    (*SJNWaiting)++;
    SpinRelease(SJCacheLock);
    IpcSemaphoreLock(SJWaitSemId, 0, 1);
#endif /* HAS_TEST_AND_SET */
d1556 1
a1556 1
	sjunwait_io(item);
d1565 1
a1565 1
	sjunwait_io(item);
d1570 1
a1570 1
    sjunwait_io(item);
d1600 3
a1602 2
 *	This is an unbelievably expensive operation.  We should cache this
 *	number in shared memory once we compute it.
d1609 1
a1609 18
    Relation plmap;
    TupleDescriptor plmdesc;
    HeapScanDesc plmscan;
    HeapTuple plmtup;
    Buffer buf;
    ObjectId reldbid;
    Datum d;
    Boolean n;
    int32 v;
    int32 maxblkno;
    int i;
    int grpno;
    SJCacheItem *item;
    ScanKeyEntryData plmkey[2];

    /* see if we've already figured this out */
    if ((maxblkno = _sjfindnblocks(reln->rd_id)) >= 0)
	return (maxblkno);
d1612 1
a1612 1
	reldbid = (ObjectId) 0;
d1614 1
a1614 1
	reldbid = MyDatabaseId;
d1616 1
a1616 45
    ScanKeyEntryInitialize(&plmkey[0], 0x0, Anum_pg_plmap_pldbid,
			   ObjectIdEqualRegProcedure,
			   ObjectIdGetDatum(reldbid));

    ScanKeyEntryInitialize(&plmkey[1], 0x0, Anum_pg_plmap_plrelid,
			   ObjectIdEqualRegProcedure,
			   ObjectIdGetDatum(reln->rd_id));

    plmap = heap_openr(Name_pg_plmap);
    plmdesc = RelationGetTupleDescriptor(plmap);
    plmscan = heap_beginscan(plmap, false, NowTimeQual, 2, &plmkey[0]);

    maxblkno = 0;

    /*
     *  Find the highest-numbered group in the relation by scanning
     *  pg_plmap.
     */

    while (HeapTupleIsValid(plmtup = heap_getnext(plmscan, false,
						  (Buffer *) NULL))) {
	d = (Datum) heap_getattr(plmtup, InvalidBuffer, Anum_pg_plmap_plblkno,
				 plmdesc, &n);
	v = DatumGetInt32(d);
	if (v > maxblkno)
	    maxblkno = v;
    }

    heap_endscan(plmscan);
    heap_close(plmap);

    /*
     *  Get the highest-numbered group, and count the number of blocks
     *  that are actually present in the group.
     */

    item = _sjfetchgrp(reldbid, reln->rd_id, maxblkno, &grpno);

    for (i = 0; i < SJGRPSIZE; i++) {
	if (item->sjc_flags[i] & SJC_MISSING)
	    break;
    }

    /* don't need the reference anymore */
    _sjunpin(item);
d1618 1
a1618 3
    /* adjust the count of blocks and remember it for next time */
    maxblkno += i;
    _sjregnblocks(reln->rd_id, maxblkno);
d1620 1
a1620 1
    return(maxblkno);
d1624 1
a1624 3
 *  _sjfindnblocks() -- Find a precomputed block count for the given relid.
 *
 *	We should really do something smarter here.
d1628 2
a1629 2
_sjfindnblocks(relid)
    ObjectId relid;
d1632 2
d1635 1
d1639 1
a1639 1
	if (l->sjnb_relid == relid)
d1645 15
a1659 1
    return (-1);
a1663 2
 *
 *	Should really do something smarter here.
d1667 2
a1668 3
_sjregnblocks(relid, nblocks)
    ObjectId relid;
    int nblocks;
d1670 1
d1672 1
d1679 4
a1682 3
	if (l->sjnb_relid == relid) {
	    l->sjnb_nblocks = nblocks;
	    return;
a1683 1

d1688 33
a1720 5
    l = (SJNBlock *) palloc(sizeof(SJNBlock));
    l->sjnb_relid = relid;
    l->sjnb_nblocks = nblocks;
    l->sjnb_next = SJNBlockList;
    SJNBlockList = l;
d1823 1
a1823 1
	    seekpos = i * SJBufSize;
d1871 3
a1873 3
    printf("jukebox cache metdata: size %d, %d entries, lru head %d tail %d",
	   SJCACHESIZE, nentries, SJHeader->sjh_lruhead,
	   SJHeader->sjh_lrutail);
d1880 1
a1880 1
    for (i = 0; i < nentries; i++) {
d1885 1
a1885 1
	       item->sjc_lrunext, item->sjc_lruprev,
@


1.9
log
@hash table operators should not have same names as grammar tokens
@
text
@d34 1
a34 1
RcsId("$Header: RCS/sj.c,v 1.8 91/07/26 00:52:21 mao Exp Locker: mer $");
d36 1
a36 140
/*
 *  When the buffer pool requests a particular page, we load a group of
 *  pages from the jukebox into the mag disk cache for efficiency.
 *  SJCACHESIZE is the number of these groups in the disk cache.  Every
 *  group is represented by one entry in the shared memory cache.  SJGRPSIZE
 *  is the number of 8k pages in a group.
 */

#define	SJCACHESIZE	64		/* # groups in mag disk cache */
#define	SJGRPSIZE	10		/* # 8k pages in a group */
#define SJPATHLEN	64		/* size of path to cache file */

/* misc constants */
#define	SJCACHENAME	"_sj_cache_"	/* relative to $POSTGRESHOME/data */
#define	SJMETANAME	"_sj_meta_"	/* relative to $POSTGRESHOME/data */

/* bogus macros */
#define	RelationSetLockForExtend(r)

/*
 *  SJGroupDesc -- Descriptor block for a cache group.
 *
 *	The first 1024 bytes in a group -- on a platter or in the magnetic
 *	disk cache -- are a descriptor block.  We choose 1024 bytes because
 *	this is the native block size of the jukebox.
 *
 *	This block includes a description of the data that appears in the
 *	group, including relid, dbid, relname, dbname, and a unique OID
 *	that we use to verify cache consistency on startup.  SJGroupDesc
 *	is the structure that contains this information.  It resides at the
 *	start of the 1024-byte block; the rest of the block is unused.
 */

typedef struct SJGroupDesc {
    long	sjgd_magic;
    long	sjgd_version;
    NameData	sjgd_dbname;
    NameData	sjgd_relname;
    ObjectId	sjgd_dbid;
    ObjectId	sjgd_relid;
    long	sjgd_relblkno;
    long	sjgd_jboffset;
    long	sjgd_extentsz;
    ObjectId	sjgd_groupoid;
} SJGroupDesc;

#define SJGDMAGIC	0x060362
#define	SJGDVERSION	0
#define JBBLOCKSZ	1024

/*
 *  SJCacheTag -- Unique identifier for individual groups in the magnetic
 *		  disk cache.
 *
 *	We use this identifier to query the shared memory cache metadata
 *	when we want to find a particular group.  
 */

typedef struct SJCacheTag {
    ObjectId		sjct_dbid;	/* database OID of this group */
    ObjectId		sjct_relid;	/* relation OID of this group */
    BlockNumber		sjct_base;	/* number of first block in group */
} SJCacheTag;

/*
 *  SJHashEntry -- The hash table code returns a pointer to a structure
 *		   that has this layout.
 */

typedef struct SJHashEntry {
    SJCacheTag		sjhe_tag;	/* cache tag -- hash key */
    int			sjhe_groupno;	/* which group this is in cache file */
} SJHashEntry;

/*
 *  SJCacheHeader -- Header data for in-memory metadata cache.
 */

typedef struct SJCacheHeader {
    int			sjh_nentries;
    int			sjh_lruhead;
    int			sjh_lrutail;
    uint32		sjh_flags;

#define SJH_INITING	(1 << 0)
#define SJH_INITED	(1 << 1)

#ifdef HAS_TEST_AND_SET

    slock_t		sjh_initlock;	/* initialization in progress lock */

#endif /* HAS_TEST_AND_SET */

} SJCacheHeader;

/*
 *  SJCacheItem -- Cache item describing blocks on the magnetic disk cache.
 *
 *	An array of these is maintained in shared memory, with one entry
 *	for every group that appears in the magnetic disk block cache.  We
 *	maintain a consistent copy of this array on magnetic disk whenever
 *	we change the cache contents.  This is because the magnetic disk
 *	cache is persistent, and contains data that logically appears on the
 *	jukebox between backend instances.
 *
 *	The OID that appears in this structure is used to detect corruption
 *	of the cache due to crashes during cache metadata update on disk.
 *	When we detect corruption, we recover by marking the group free.  We
 *	are very careful to do this in a way that guarantees no data is lost,
 *	and that does not require log processing.
 *
 *	Since we never return pointers to private data, we don't need to
 *	maintain a free list or pin count on magnetic disk cache groups.
 *	In shared memory, we maintain a list of groups in LRU order (offsets
 *	from the start of cache metadata are stored in this structure).
 *	When we need a group for data transfer, we use the least-recently-used
 *	group's space, kicking it out to the platter if necessary.
 *
 *	Groups on the jukebox include one page (the first) that describes the
 *	group, including its dbid, relid, dbname, relname, and extent size.
 *	This page also includes the OID described above.
 */

typedef struct SJCacheItem {
    SJCacheTag		sjc_tag;		/* dbid, relid, group triple */
    int			sjc_lruprev;		/* LRU list pointer */
    int			sjc_lrunext;		/* LRU list pointer */
    int			sjc_refcount;		/* number of active refs */
    ObjectId		sjc_oid;		/* OID of group */

    uint8		sjc_gflags;		/* flags for entire group */

#define SJG_CLEAR	(uint8) 0x0
#define	SJG_IOINPROG	(1 << 0)

    uint8		sjc_flags[SJGRPSIZE];	/* flag bytes, 1 per block */

#define	SJC_DIRTY	(1 << 0)
#define SJC_MISSING	(1 << 1)
#define SJC_ONPLATTER	(1 << 2)
a37 23
#ifdef HAS_TEST_AND_SET

    slock_t		sjc_iolock;		/* transfer in progress */

#endif /* HAS_TEST_AND_SET */

} SJCacheItem;

/*
 *  SJNBlock -- Linked list of count of blocks in relations.
 *
 *	Computing a block count is so expensive that we cache the count
 *	in local space when we've done the work.  This is really a stupid
 *	way to do it -- we'd rather do it in shared memory and have the
 *	computed count survive transactions -- but this will work for now.
 */

typedef struct SJNBlock {
    ObjectId		sjnb_relid;
    int			sjnb_nblocks;
    struct SJNBlock	*sjnb_next;
} SJNBlock;

d40 2
a41 3
extern bool		IsPostmaster;	/* is this the postmaster running? */
extern ObjectId		MyDatabaseId;	/* OID of database we have open */
extern Name		MyDatabaseName;	/* name of database we have open */
a70 1
static int	SJBufSize = ((BLCKSZ * SJGRPSIZE) + JBBLOCKSZ);
d72 2
a73 14
/* routines declared here */
extern void		sjcacheinit();
extern void		sjwait_init();
extern void		sjunwait_init();
extern void		sjwait_io();
extern void		sjunwait_io();
extern void		sjtouch();
extern void		sjunpin();
extern void		sjregister();
extern void		sjregnblocks();
extern int		sjfindnblocks();
extern ObjectId		sjchoose();
extern SJCacheItem	*sjallocgrp();
extern SJCacheItem	*sjfetchgrp();
d75 18
a198 9
    if (IsPostmaster) {

	if (metafound)
	    elog(FATAL, "sj cache found in shared memory by postmaster!");

	bzero((char *) cachesave, metasize);
	return (SM_SUCCESS);
    }

d235 1
a235 1
		sjunwait_init();
d251 1
a251 1
		sjunwait_init();
d262 4
a265 4
     *  Finally, if it's our responsibility to initialize the shared-memory
     *  cache metadata, then go do that.  sjcacheinit() will elog(FATAL, ...)
     *  if it can't initialize the cache, so we don't need to worry about
     *  a return value here.
d269 1
a269 1
	sjcacheinit();
d272 8
d283 2
a284 2
void
sjcacheinit()
d308 1
a308 1
	sjunwait_init();
d331 1
a331 1
	    sjunwait_init();
d337 1
a337 1
	    sjunwait_init();
d353 2
a354 2
	/* not waiting on I/O or anything, no active references to this guy */
	cur->sjc_gflags = SJG_CLEAR;
d375 1
a375 1
 *  sjunwait_init() -- Release initialization lock on the jukebox cache.
d384 1
a384 1
 *	finish, we call sjunwait_init() to release the initialization lock
d392 2
a393 2
void
sjunwait_init()
d424 1
a424 1
    item->sjc_gflags &= ~SJG_IOINPROG;
d443 1
a443 1
 *  sjwait_init() -- Wait for cache initialization to complete.
d451 2
a452 2
void
sjwait_init()
d466 1
a466 1
 *  sjwait_io() -- Wait for group IO to complete.
d475 2
a476 2
void
sjwait_io(item)
d539 1
a539 1
	    sjwait_init();
d551 1
a551 1
    item = sjallocgrp(&grpno);
d572 1
a572 1
    item->sjc_gflags = SJG_IOINPROG;
d583 1
a607 2
    group->sjgd_jboffset = -1;
    group->sjgd_extentsz = (SJBufSize / JBBLOCKSZ);
d610 19
a628 1
    if (sjwritegrp(item, grpno) == SM_FAIL) {
d633 1
a633 3
    /* record presence of new extent in system catalogs */
    sjregister(item, group->sjgd_jboffset, group->sjgd_extentsz);
    sjregnblocks(reln->rd_id, 0);
d639 1
a639 1
    sjunpin(item);
d655 1
a655 1
 *  sjregister() -- Make catalog entry for a new extent
d666 2
a667 2
void
sjregister(item, jboffset, extentsz)
d669 1
a669 2
    int32 jboffset;
    int32 extentsz;
d676 9
a684 4
    plmap = heap_openr(Name_pg_plmap);
    RelationSetLockForWrite(plmap);

    plmdata = (Form_pg_plmap) palloc(sizeof(FormData_pg_plmap));
d686 1
a686 2
    /* choose a platter to put the new extent on */
    plmdata->plid = sjchoose(item);
d688 4
a691 6
    /* init the rest of the fields */
    plmdata->pldbid = item->sjc_tag.sjct_dbid;
    plmdata->plrelid = item->sjc_tag.sjct_relid;
    plmdata->plblkno = item->sjc_tag.sjct_base;
    plmdata->ploffset = jboffset;
    plmdata->plextentsz = extentsz;
d701 4
d708 4
d713 1
a713 1
    heap_close(plmap);
d717 1
a717 1
 *  sjchoose() -- Choose a platter to receive a new extent.
d723 2
a724 2
ObjectId
sjchoose(item)
d728 1
d732 1
d734 8
d744 1
d749 9
a757 1
	elog(WARN, "sjchoose: no platters in pg_plmap");
d759 1
a759 1
    plid = plattup->t_oid;
a760 1

d764 13
a776 1
    return (plid);
d780 1
a780 1
 *  sjallocgrp() -- Allocate a new group in the cache for use by some
d792 2
a793 2
SJCacheItem *
sjallocgrp(grpno)
d820 1
a820 1
    sjtouch(item);
d825 2
a826 2
SJCacheItem *
sjfetchgrp(dbid, relid, blkno, grpno)
d847 1
a847 1
	elog(FATAL, "sjfetchgrp: hash table corrupted");
d854 3
a856 3
	if (item->sjc_gflags & SJG_IOINPROG) {
	    sjwait_io(item);
	    return (sjfetchgrp(dbid, relid, blkno));
d859 1
a859 1
	sjtouch(item, *grpno);
d864 1
a864 1
	elog(FATAL, "sjfetchgrp: hey mao: can't find <%d,%d,%d>",
d871 2
a872 2
void
sjtouch(item, grpno)
d895 2
a896 2
void
sjunpin(item)
d901 1
a901 1
	elog(FATAL, "sjunpin: illegal reference count");
d906 2
a907 2
int
sjwritegrp(item, grpno)
d953 2
a954 2
int
sjreadgrp(item, grpno)
d967 1
a967 1
	elog(NOTICE, "sjreadgrp: cannot seek");
d976 1
a976 1
	    elog(NOTICE, "sjreadgrp: read failed");
d988 1
a988 1
	elog(NOTICE, "sjreadgrp: trashed cache");
d1046 1
a1046 1
    sjtouch(item, grpno);
d1061 1
a1061 1
    item->sjc_gflags = SJG_IOINPROG;
d1072 1
a1072 1
    if (sjreadgrp(item, grpno) == SM_FAIL) {
d1080 1
a1080 1
    if (sjwritegrp(item, grpno) == SM_FAIL) {
d1086 1
a1086 1
    sjunpin(item);
d1088 1
a1088 1
    sjregnblocks(reln->rd_id, ++nblocks);
d1133 1
a1133 1
    item = sjfetchgrp(reldbid, reln->rd_id, blocknum / SJGRPSIZE, &grpno);
d1135 1
a1135 1
    /* shd expand sjfetchgrp() inline to avoid extra semop()s */
d1138 1
a1138 1
    item->sjc_gflags = SJG_IOINPROG;
d1149 1
a1149 1
    if (sjreadgrp(item, grpno) == SM_FAIL) {
d1158 1
a1158 1
    sjunpin(item);
d1180 1
a1180 1
    item = sjfetchgrp(reldbid, reln->rd_id, blocknum / SJGRPSIZE, &grpno);
d1182 1
a1182 1
    /* shd expand sjfetchgrp() inline to avoid extra semop()s */
d1189 1
a1189 1
	sjunpin(item);
d1195 1
a1195 1
    item->sjc_gflags = SJG_IOINPROG;
d1206 1
a1206 1
    if (sjreadgrp(item, grpno) == SM_FAIL) {
d1208 1
a1208 1
	sjunpin(item);
d1215 1
a1215 1
    if (sjwritegrp(item, grpno) == SM_FAIL) {
d1217 1
a1217 1
	sjunpin(item);
d1222 1
a1222 1
    sjunpin(item);
d1275 1
a1275 1
    if ((maxblkno = sjfindnblocks(reln->rd_id)) >= 0)
d1319 1
a1319 1
    item = sjfetchgrp(reldbid, reln->rd_id, maxblkno, &grpno);
d1327 1
a1327 1
    sjunpin(item);
d1331 1
a1331 1
    sjregnblocks(reln->rd_id, maxblkno);
d1337 1
a1337 1
 *  sjfindnblocks() -- Find a precomputed block count for the given relid.
d1342 2
a1343 2
int
sjfindnblocks(relid)
d1361 1
a1361 1
 *  sjregnblocks() -- Remember the count of blocks for this relid.
d1366 2
a1367 2
void
sjregnblocks(relid, nblocks)
d1434 2
d1441 3
d1447 85
d1553 1
a1553 1
	printf("    [%2d] <%ld,%ld,%ld> next %d prev %d flags %s oid %ld\n",
d1555 3
a1557 3
	       item->sjc_tag.sjct_base, item->sjc_lrunext,
	       item->sjc_lruprev,
	       (item->sjc_gflags & SJG_IOINPROG ? "IO_IN_PROG" : "CLEAR"),
@


1.8
log
@bug fix -- was passing a structure instead of a pointer.  you've got to
be careful what you dereference, these days.
@
text
@d34 1
a34 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.7 1991/07/24 23:37:03 mao Exp mao $");
d475 1
a475 1
					     ENTER, &found);
d720 1
a720 1
    entry = (SJHashEntry *) hash_search(SJCacheHT, item, ENTER, &found);
d948 1
a948 1
    entry = (SJHashEntry *) hash_search(SJCacheHT, &tag, FIND, &found);
d1136 1
a1136 1
    entry = (SJHashEntry *) hash_search(SJCacheHT, &tag, FIND, &found);
@


1.7
log
@clean up conditional compilation, fix bug in initialization code for
platforms with test and set locks
@
text
@d34 1
a34 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.6 1991/07/24 07:47:24 mao Exp mao $");
d1206 1
a1206 1
    path = relpath(reln->rd_rel->relname);
@


1.6
log
@fix size computations, add main memory storage manager
@
text
@d34 1
a34 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.5 1991/07/22 22:21:11 mao Exp mao $");
d375 1
a380 1
	SJHeader->sjh_flags = SJH_INITING;
a1584 117
}

#else /* SONY_JUKEBOX */

#include "machine.h"
#include "storage/smgr.h"
#include "utils/rel.h"

/*
 *  If there's no sony jukebox, we just use stub routines.
 */

int
sjinit(unused)
    int unused;
{
    return (SM_SUCCESS);
}

int
sjshutdown()
{
    return (SM_SUCCESS);
}

int
sjcreate(reln)
    Relation reln;
{
    return (-1);
}

int
sjunlink(reln)
    Relation reln;
{
    return (SM_FAIL);
}

int
sjextend(reln, buffer)
    Relation reln;
    char *buffer;
{
    return (SM_FAIL);
}

int
sjopen(reln)
    Relation reln;
{
    return (-1);
}

int
sjclose(reln)
    Relation reln;
{
    return (SM_FAIL);
}

int
sjread(reln, blocknum, buffer)
    Relation reln;
    BlockNumber blocknum;
    char *buffer;
{
    return (SM_FAIL);
}

int
sjwrite(reln, blocknum, buffer)
    Relation reln;
    BlockNumber blocknum;
    char *buffer;
{
    return (SM_FAIL);
}

int
sjflush(reln, blocknum, buffer)
    Relation reln;
    BlockNumber blocknum;
    char *buffer;
{
    return (SM_FAIL);
}

int
sjblindwrt(dbstr, relstr, dbid, relid, blkno, buffer)
    char *dbstr;
    char *relstr;
    OID dbid;
    OID relid;
    BlockNumber blkno;
    char *buffer;
{
    return (SM_FAIL);
}

int
sjnblocks(reln)
    Relation reln;
{
    return (-1);
}

int
sjcommit()
{
    return (SM_SUCCESS);
}

int
sjabort()
{
    return (SM_SUCCESS);
@


1.5
log
@jukebox storage manager installation
@
text
@d34 1
a34 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.4 1991/07/22 08:00:36 mao Exp mao $");
a1531 8
    nbuckets = 1 << my_log2((SJCACHESIZE - 1) / DEF_FFACTOR + 1);
    nsegs = 1 << my_log2((nbuckets - 1) / DEF_SEGSIZE + 1);

    /* size of shared memory binding table */
    size = my_log2(BTABLE_SIZE) + sizeof(HHDR)
            + DEF_SEGSIZE * sizeof(SEGMENT) + BUCKET_ALLOC_INCR *
            (sizeof(BUCKET_INDEX) + BTABLE_KEYSIZE + BTABLE_DATASIZE);

d1533 1
a1533 1
    size += ((SJCACHESIZE + 1) * sizeof(SJCacheItem)) + sizeof(SJCacheHeader);
@


1.4
log
@added code for just about everything, still pretty buggy
@
text
@d34 1
a34 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.3 1991/07/22 05:32:38 mao Exp mao $");
d840 5
a1068 11
    long	sjgd_magic;
    long	sjgd_version;
    NameData	sjgd_dbname;
    NameData	sjgd_relname;
    ObjectId	sjgd_dbid;
    ObjectId	sjgd_relid;
    long	sjgd_relblkno;
    long	sjgd_jboffset;
    long	sjgd_extentsz;
    ObjectId	sjgd_groupoid;

d1407 3
a1409 2
    while (HeapTupleIsValid(plmtup = heap_getnext(plmscan, false, &buf))) {
	d = (Datum) heap_getattr(plmtup, buf, Anum_pg_plmap_plblkno,
a1410 1
	ReleaseBuffer(buf);
d1501 1
a1501 8
    SJNBlock *l;

    while (SJNBlockList != (SJNBlock *) NULL) {
	l = SJNBlockList;
	SJNBlockList = SJNBlockList->sjnb_next;
	pfree(l);
    }

d1510 1
a1510 8
    SJNBlock *l;

    while (SJNBlockList != (SJNBlock *) NULL) {
	l = SJNBlockList;
	SJNBlockList = SJNBlockList->sjnb_next;
	pfree(l);
    }

@


1.3
log
@more stuff working -- create updates catalogs, etc
@
text
@d34 1
a34 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.2 1991/07/21 23:13:32 mao Exp mao $");
d52 3
d583 2
d733 1
d737 1
a740 3
    /* okay, cache and group are set up -- safe to release excl lock now */
    SpinRelease(SJCacheLock);

d777 1
d787 1
a787 1
	sprintf(path, "../%16s", &(reln->rd_rel->relname.data[0]));
d789 1
a789 1
	sprintf(path, "%16s", &(reln->rd_rel->relname.data[0]));
d791 2
d926 1
a926 1
sjfetchgrp(dbid, relid, blkno)
d930 1
d951 2
a952 1
	item = &(SJCache[entry->sjhe_groupno]);
d959 1
a959 1
	sjtouch(item, entry->sjhe_groupno);
d1003 1
d1054 53
d1118 84
a1201 1
    return (SM_FAIL);
d1208 9
a1216 1
    return (-1);
d1223 3
a1225 1
    return (SM_FAIL);
d1234 38
a1271 1
    return (SM_FAIL);
d1280 56
a1335 1
    return (SM_FAIL);
d1344 1
a1344 1
    return (SM_FAIL);
a1365 2
#define	RelationSetLockForExtend(r)

d1381 1
d1383 1
a1383 4
    ScanKeyEntry plmkey[2];

    /* need to guarantee reln doesn't change size while we're thinking */
    RelationSetLockForExtend(reln);
d1430 1
a1430 1
    item = sjfetchgrp(reldbid, reln->rd_id, maxblkno);
@


1.2
log
@checkpoint -- sony jukebox manager starting to work
@
text
@d16 1
d22 1
d27 6
a32 1
RcsId("$Header: /users/mao/postgres/src/storage/smgr/RCS/sj.c,v 1.1 1991/07/09 00:12:09 mao Exp mao $");
d34 2
d160 1
d182 15
d208 1
d234 13
a246 5
extern void	sjcacheinit();
extern void	sjwait_init();
extern void	sjunwait_init();
extern void	sjwait_io();
extern void	sjunwait_io();
d422 3
d509 1
a509 1
	/* not waiting on I/O or anything */
d511 1
d705 1
a705 2
    grpno = sjallocgrp();
    item = &SJCache[grpno];
d763 1
a763 1
    group->sjgd_extentsz = -1;
d771 3
d777 3
d792 81
d885 3
a887 2
int
sjallocgrp()
a888 1
    int grpno;
d893 1
a893 1
	grpno = SJHeader->sjh_nentries;
d900 1
a900 1
    item = &SJCache[grpno];
d905 48
a952 2
	SJHeader->sjh_lruhead = grpno;
	SJHeader->sjh_lrutail = grpno;
d954 3
a956 2
	SJCache[SJHeader->sjh_lruhead].sjc_lruprev = grpno;
	SJHeader->sjh_lruhead = grpno;
d959 35
a993 1
    return (grpno);
d1111 9
d1124 100
d1227 33
d1263 10
d1279 10
@


1.1
log
@Initial revision
@
text
@d4 2
a5 1
 *	This code manages relations that reside on magnetic disk.
d8 2
d13 2
d16 2
d19 3
d23 917
d941 2
a942 1
RcsId("$Header$");
d944 66
d1011 1
a1011 1
 *  Only stub routines right now.
d1015 2
a1016 1
sjinit()
d1120 2
@
