/*
 * rlockutils.c --
 *	Routines to manipulate locks for the planner and executor.
 */

#include "anum.h"
#include "bufmgr.h"
#include "catname.h"
#include "rproc.h"
#include "fmgr.h"
#include "ftup.h"
#include "hamr.h"
#include "heapam.h"
#include "lispdep.h"
#include "log.h"
#include "rac.h"
#include "rlock.h"
#include "syscache.h"
#include "tqual.h"

RcsId("$Header: util2.c,v 1.1 89/02/20 16:27:29 hirohama Exp $");

/*#define HAS_RULSERVER	/* XXX iff rule->rulserver works */

#define RULELOCKDEBUG	/* XXX */

/*
 *	MakeRuleLockIntermediateLock
 *
 *	Called by the planner to generate a new RuleLockIntermediateLockData.
 *
 *	Returns a pointer to a malloc'd RuleLockIntermediateLockData.
 */
RuleLockIntermediateLockData *
MakeRuleLockIntermediateLock(locktype, attrno, planno)
	int32	locktype, attrno, planno;
{
	RuleLockIntermediateLockData	*ruleLockIntermediateLock;

	ruleLockIntermediateLock = (RuleLockIntermediateLockData *) 
		malloc(sizeof(RuleLockIntermediateLockData));
	ruleLockIntermediateLock->locktype = locktype;
	ruleLockIntermediateLock->attrno = attrno;
	ruleLockIntermediateLock->planno = planno;
	ruleLockIntermediateLock->next_lock = NULL;
	return(ruleLockIntermediateLock);
}


/*
 *	MakeRuleLockIntermediate
 *
 *	Called by the planner to generate a new RuleLockIntermediate.
 *	To do this, it generates a RuleLockIntermediate header and
 *	attaches the LISP list of RuleLockIntermediateLockData to the header.
 *
 *	Returns a pointer to a malloc'd RuleLockIntermediate.
 */
RuleLockIntermediate *
MakeRuleLockIntermediate(ruleid, priority, ruletype, isearly, intermediates)
	int32		ruleid, priority, ruletype, isearly;
	LispValue	intermediates;
{
	register LispValue		lp;
	RuleLockIntermediate		*rlip;

	rlip = (RuleLockIntermediate *) malloc(sizeof(RuleLockIntermediate));
	rlip->ruleid = ruleid;
	rlip->priority = priority;
	rlip->ruletype = ruletype;
	rlip->isearly = (bool) isearly;
	for (lp = intermediates; CDR(lp) != LispNil; lp = CDR(lp))
		((RuleLockIntermediateLockData *)
		 LISPVALUE_INTEGER(CAR(lp)))->next_lock =
			 (RuleLockIntermediateLockData *)
				 LISPVALUE_INTEGER(CAR(CDR(lp)));
	rlip->locks = (RuleLockIntermediateLockData *)
		LISPVALUE_INTEGER(CAR(intermediates));
	rlip->next_rulepack = (RuleLockIntermediate *) NULL;
	return(rlip);
}


/*
 *	MakeRuleLock
 *
 *	Called by the executor to generate a new (internal) RuleLock.
 *	A LISP list of pointers to the RuleLockIntermediates are linked into
 *	a list of rulepacks and the whole shebang is converted to an internal
 *	RuleLock.
 *
 *	Returns a pointer to a new RuleLock.
 */
RuleLock
MakeRuleLock(intermediates)
	LispValue	intermediates;
{
	register LispValue	lp;
	RuleLockIntermediate	*rlip;

	for (lp = intermediates; CDR(lp) != LispNil; lp = CDR(lp))
		((RuleLockIntermediate *)
		 LISPVALUE_INTEGER(CAR(lp)))->next_rulepack =
			 (RuleLockIntermediate *)
				 LISPVALUE_INTEGER(CAR(CDR(lp)));
	rlip = (RuleLockIntermediate *) LISPVALUE_INTEGER(CAR(intermediates));
	return(RuleLockIntermediateToInternal(rlip));
}


/*
 *	RuleLockIntermediateDump
 */
void
RuleLockIntermediateDump(ruleLockIntermediate)
	RuleLockIntermediate	*ruleLockIntermediate;
{
	RuleLockIntermediatePrint(stdout, ruleLockIntermediate);
}


void
RuleLockIntermediateSetDirection (ruleLockIntermediate, direction)
	RuleLockIntermediate	*ruleLockIntermediate;
	bool			direction;
{
	ruleLockIntermediate->isearly = (bool) direction;
}


/* 
 * Really should use the syscache here to speed things up.
 */
void 
RuleLockIntermediateRelationLock (relrel, reloid, newlock)
	Relation		relrel;
	ObjectId  		reloid;
	RuleLockIntermediate 	*newlock;
{
	Relation		rd1;
	HeapScan		sd;
	static ScanKeyEntryData	key[1] = {
		{ 0, ObjectIdAttributeNumber, ObjectIdEqualRegProcedure }
	};
	HeapTuple 		tup;
	ItemPointerData		tid;
	RuleLock 		inlock;
	RuleLockIntermediate 	*oldlock;
	Buffer 			buffer;
	Boolean 		isnull;
	extern char		*amgetattr();	/* XXX */
	
	key[0].argument.objectId.value = reloid;
	
/*	rd1 = RelationNameOpenHeapRelation(RelationRelationName);*/
	sd = RelationBeginHeapScan(relrel, 0, SelfTimeQual, 1, (ScanKey) key);
	
	tup = (HeapTuple) HeapScanGetNextTuple(sd, 0, &buffer);
	if (!HeapTupleIsValid(tup)) {
		elog(WARN,
		     "RuleIntermediateRelationLock(%lu): no such relation",
		     (long unsigned) reloid);
	}
	tid = tup->t_ctid;
	Assert (ItemPointerIsValid (&tid));
	

	Assert (HeapTupleIsValid (tup));
	Assert (BufferIsValid (buffer));
	inlock = HeapTupleGetRuleLock (tup, buffer);

	/* 
	 * debugging stuff 
	 */
#ifdef RULELOCKDEBUG
	debugtup (tup, &relrel->rd_att);
	if (inlock != (RuleLock) NULL) {
		elog  (DEBUG,"RuleLockIntermediateRelationLock: lock is not null");
		oldlock = RuleLockInternalToIntermediate (inlock);
		Assert (RuleLockIsValid(oldlock));
		RuleLockIntermediateDump (oldlock);
	}    

	/*
	 * test. see if amgettr will get back some locks. compare with 
	 * RelationGetRuleLocks. 
	 */
	inlock = (RuleLock) amgetattr(tup, buffer,
				      RuleLockAttributeNumber,
				      &rd1->rd_att, &isnull);
	if (isnull) {
		elog (DEBUG, "lock is null.");
	}
	if  (inlock != (RuleLock) NULL) {
		elog   (DEBUG, "rulelock is not null.. \n");
		oldlock = RuleLockInternalToIntermediate (inlock);
		RuleLockIntermediateDump (oldlock);
	}
	
#endif /* RULELOCKDEBUG */
	
	Assert (RelationIsValid (relrel));
	inlock =  RuleLockIntermediateToInternal (newlock);
	Assert (RuleLockIsValid (inlock));
	
	RelationAddHeapRuleLock (relrel, &tid, inlock);

	
	
	tup = RelationGetHeapTupleByItemPointer(relrel, SelfTimeQual,
						&tid, &buffer);
	inlock = HeapTupleGetRuleLock (tup, buffer);
	if (!RuleLockIsValid (inlock)) {
		elog   (DEBUG, "problem with insert of lock");
	}
	if (inlock != (RuleLock) NULL){
		elog (DEBUG, "written out lock");
		RuleLockIntermediateDump (RuleLockInternalToIntermediate (inlock));
	}
}

RuleLockIntermediateFindEWLock (rulelock)
	RuleLockIntermediate	*rulelock;
{
	int 				found;
	RuleLockIntermediateLockData 	*lockpack;
	RuleLockIntermediate		*lock;
	
	if (rulelock == (RuleLockIntermediate *) NULL) {
		return (0);
	}
	found = 0;    
	lock = rulelock;
	
	while (lock->locks != (RuleLockIntermediateLockData *) NULL &&
	       !found) {
		lockpack = lock->locks;
		while (!found && lock->isearly) { 
			if (lockpack->locktype == RuleLockTypeWrite) {
				found = 1;
			}
			lockpack = lockpack->next_lock;
		}
		lock = lock->next_rulepack;
	}
	return (found);
    
}


/*
 *	RuleInsertCatalog
 *
 *	Put an all-null tuple into the catalogs so that we can get back a
 *	usable rule id.
 *
 *	Called by the traffic cop, which has to provide a valid rule id
 *	to the planner so that the latter can insert into the rule locks
 *	it generates.
 *
 *	Returns the new rule relation OID.
 */
ObjectId
RuleInsertCatalog()
{
	register		i;
	Relation		ruleRelation;
	HeapTuple		heapTuple;
	char			*values[RuleRelationNumberOfAttributes];
	char			nulls[RuleRelationNumberOfAttributes];
	ObjectId		objectId;

	for (i = 0; i < RuleRelationNumberOfAttributes; ++i) {
		values[i] = NULL;
		nulls[i] = 'n';
	}
	ruleRelation = RelationNameOpenHeapRelation(RuleRelationName);
	if (!RelationIsValid(ruleRelation)) {
		elog(WARN, "RuleInsertCatalog: could not open relation %s",
		     RuleRelationName);
		return((ObjectId) 0);
	}
	heapTuple = FormHeapTuple(RuleRelationNumberOfAttributes,
				  &ruleRelation->rd_att, values, nulls);
	/* XXX What do we do with rule locks here?? */
	(void) RelationInsertHeapTuple(ruleRelation, heapTuple,
				       (double *) NULL);
	objectId = heapTuple->t_oid;
	RelationCloseHeapRelation(ruleRelation);
	return(objectId);
}


/*
 *	RuleModifyCatalog
 *
 *	Find the shell tuple created by RuleInsertCatalog and update it with
 *	the proper values.
 *
 *	Called by the executor when the locks are to be stuffed into the
 *	catalogs.
 *
 *	Returns nothing of interest.
 */
void
RuleModifyCatalog(ruleObjectId,
		  rulname, rulowner, rulserver, rulportal, rulpriority,
		  rulstatus, rulkind, rulstat1, rulstat2)
	ObjectId	ruleObjectId;
	Name		rulname;
	ObjectId 	rulowner;
	ObjectId 	rulserver;
	Name		rulportal;
	uint16	 	rulpriority;
	char  		rulstatus;
	char 		rulkind;
	RegProcedure  	rulstat1; 
	RegProcedure  	rulstat2;
{
	register		i;
	Relation		relation;
	HeapScan		heapScan;
	HeapTuple		heapTuple;
	Buffer                  buffer;
	
	
	static ScanKeyEntryData	ruleNameKey[1] = {
		{ 0, RuleNameAttributeNumber, F_CHAR16EQ }
	};
	static ScanKeyEntryData	objectIdKey[1] = {
		{ 0, ObjectIdAttributeNumber, F_OIDEQ }
	};
	char			*values[RuleRelationNumberOfAttributes];
	char			nulls[RuleRelationNumberOfAttributes];
	char			replaces[RuleRelationNumberOfAttributes];

	/*
	 * Make sure that a rule with this name hasn't already been inserted.
	 * XXX Is rulname really the right thing to test here??
	 */
	ruleNameKey[0].argument.name.value = rulname;
	relation = RelationNameOpenHeapRelation(RuleRelationName);
	heapScan = RelationBeginHeapScan(relation, 0, NowTimeQual,
					 1, (ScanKey) ruleNameKey);
	heapTuple = HeapScanGetNextTuple(heapScan, 0, (Buffer *) NULL);
	if (HeapTupleIsValid(heapTuple)) {
		HeapScanEnd(heapScan);
		RelationCloseHeapRelation(relation);
		elog(WARN, "RuleModifyCatalog: rule %s already defined",
		     rulname);
	}
	HeapScanEnd(heapScan);
	RelationCloseHeapRelation(relation);
	
	/*
	 * Make sure that the rule's 'rulserver' process exists.
	 * XXX This doesn't actually work yet, I assume...
	 */
#ifdef HAS_RULSERVER
	objectIdKey[0].argument.objectId.value = rulserver;
	relation = RelationNameOpenHeapRelation(RuleRelationName);
	heapScan = RelationBeginHeapScan(relation, 0, NowTimeQual,
					 1, (ScanKey) objectIdKey);
	heapTuple = HeapScanGetNextTuple(heapScan, 0, (Buffer *) NULL);
	if (!HeapTupleIsValid(heapTuple)) {
		HeapScanEnd(heapScan);
		RelationCloseHeapRelation(relation);
		elog(WARN, "RuleModifyCatalog: rule %s has no server %d",
		     rulname, rulserver);
	}
	HeapScanEnd(heapScan);
	RelationCloseHeapRelation(relation);
#endif /* HAS_RULSERVER */

	/*
	 * Fill in the new tuple values and replace the shell tuple.
	 */
	i = 0;
	values[i++] = fmgr(F_CHAR16IN, (char *) rulname);
	values[i++] = (char *) rulowner;
	values[i++] = (char *) rulserver;
	values[i++] = fmgr(F_CHAR16IN, (char *) rulportal);
	values[i++] = (char *) rulpriority;
	values[i++] = (char *) rulstatus;
	values[i++] = (char *) rulkind;
	values[i++] = (char *) rulstat1;
	values[i++] = (char *) rulstat2;
	for (i = 0; i < RuleRelationNumberOfAttributes; ++i) {
		nulls[i] = ' ';
		replaces[i] = 'r';
	}
	objectIdKey[0].argument.objectId.value = ruleObjectId;
	relation = RelationNameOpenHeapRelation(RuleRelationName);
	heapScan = RelationBeginHeapScan(relation, 0, SelfTimeQual,
					 1, (ScanKey) objectIdKey);
	heapTuple = HeapScanGetNextTuple(heapScan, 0,  &buffer);
	if (!HeapTupleIsValid(heapTuple)) {
		HeapScanEnd(heapScan);
		RelationCloseHeapRelation(relation);
		elog(WARN, "RuleModifyCatalog: rule %d not defined",
		     ruleObjectId);
	}
	heapTuple = ModifyHeapTuple(heapTuple, buffer, relation,
				    values, nulls, replaces);
	/* XXX What do we do with rule locks here?? */
	setheapoverride  (1); /* XXX */
#ifdef  RULELOCKDEBUG 
	elog  (DEBUG, "RuleModifyCatalog: tuple  is ");
	debugtup (heapTuple, &relation->rd_att);
#endif  

	(void) RelationReplaceHeapTuple(relation, &heapTuple->t_ctid,
					heapTuple, (double *) NULL);
	setheapoverride (0);
	HeapScanEnd(heapScan);
	RelationCloseHeapRelation(relation);
}


/*
 *	RulePlansCatalogInsert
 *
 *	Returns nothing of interest.
 */
void
RulePlansCatalogInsert(ruleObjectId, rulePlanId, rulePlan)
	ObjectId	ruleObjectId;
	int32	 	rulePlanId;
	char		*rulePlan;	/* character string */
{
	register		i;
	Relation		relation;
	HeapScan		heapScan;
	HeapTuple		heapTuple;
	static ScanKeyEntryData	ruleKey[1] = {
		{ 0, ObjectIdAttributeNumber, F_OIDEQ }
	};

	static ScanKeyEntryData	rulePlansKey[2] = {
		{ 0, RulePlansRuleIdAttributeNumber, F_OIDEQ },
		{ 0, RulePlansPlanIdAttributeNumber, F_INT4EQ }
	};
	char			*values[RulePlansRelationNumberOfAttributes];
	char			nulls[RulePlansRelationNumberOfAttributes];

	/*
	 * Make sure that the rule exists but the plan doesn't
	 */
	ruleKey[0].argument.objectId.value = ruleObjectId;
	relation = RelationNameOpenHeapRelation(RuleRelationName);
	heapScan = RelationBeginHeapScan(relation, 0, SelfTimeQual,
					 1, (ScanKey) ruleKey);
	heapTuple = HeapScanGetNextTuple(heapScan, 0, (Buffer *) NULL);
	HeapScanEnd(heapScan);
	RelationCloseHeapRelation(relation);
	if (!HeapTupleIsValid(heapTuple))
		elog(WARN, "RulePlansCatalogInsert: rule %d undefined",
		     ruleObjectId);


	rulePlansKey[0].argument.objectId.value = ruleObjectId;
	rulePlansKey[1].argument.integer32.value = rulePlanId;
	relation = RelationNameOpenHeapRelation(RulePlansRelationName);
	heapScan = RelationBeginHeapScan(relation, 0, NowTimeQual,
					 2, (ScanKey) rulePlansKey);
	heapTuple = HeapScanGetNextTuple(heapScan, 0, (Buffer *) NULL);
	HeapScanEnd(heapScan);
	RelationCloseHeapRelation(relation);
	if (HeapTupleIsValid(heapTuple))
		elog(WARN,
		     "RulePlansCatalogInsert: ruleplan %d %d already defined",
		     ruleObjectId, rulePlanId);
	
	for (i = 0; i < RulePlansRelationNumberOfAttributes; ++i)
		nulls[i] = ' ';
	values[RulePlansRuleIdAttributeNumber-1] = (char *) ruleObjectId;
	values[RulePlansPlanIdAttributeNumber-1] = (char *) rulePlanId;
	values[RulePlansPlanAttributeNumber-1] = fmgr(F_TEXTIN, rulePlan);
	
        heapTuple = FormHeapTuple(RulePlansRelationNumberOfAttributes,
                                  &relation->rd_att, values, nulls);
	/* XXX What do we do with rule locks here?? */
	(void) RelationInsertHeapTuple(relation, heapTuple, (double *) NULL);
	RelationCloseHeapRelation(relation);
}


/*
 *	RulePlansCatalogExtract
 *
 *	Returns a LISP string containing a rule plan.
 */
LispValue
RulePlansCatalogExtract(ruleObjectId, rulePlanId)
	LispValue	ruleObjectId, rulePlanId;
{
	HeapTuple	tup;
	Relation	relation;
	char		*plan;
	Boolean		isnull;
	extern char	*amgetattr();	/* XXX */

	tup = SearchSysCacheTuple(RULEPLANCODE,
				  (char *) LISPVALUE_INTEGER(ruleObjectId),
				  (char *) LISPVALUE_INTEGER(rulePlanId),
				  (char *) NULL, (char *) NULL);
	if (!HeapTupleIsValid(tup)) {
		elog(DEBUG, "RulePlansCatalogExtract: no rule <%d %d>",
		     LISPVALUE_INTEGER(ruleObjectId),
		     LISPVALUE_INTEGER(rulePlanId));
		return(LispNil);
	}
	relation = RelationNameOpenHeapRelation(RulePlansRelationName);
	plan = amgetattr(tup, UnknownBuffer,
			 RulePlansPlanAttributeNumber,
			 &relation->rd_att, &isnull);
	if (!PointerIsValid(plan)) {
		elog(DEBUG, "RulePlansCatalogExtract: no valid rule <%d %d>",
		     LISPVALUE_INTEGER(ruleObjectId),
		     LISPVALUE_INTEGER(rulePlanId));
		return(LispNil);
	}
	return(lispString(plan));
}

/*
 * found in Philip's directories....
 * does not compile....
 */
/*
RuleDeleteCatalog (ruleObjectId)
	LispValue ruleObjectId;
{
	register 	i;
	Relation 	ruleRelation;
	HeapTuple	heapTuple;
	HeapScan 	heapScan;
	
	

	ruleRelation = RelationNameOpenHeapRelation (RuleRelationName);

	if (!RelationIsValid(ruleRelation)) {
		elog(WARN, "RuleInsertCatalog: could not open relation %s",
		     RuleRelationName);
		
	}


	heapScan = RelationBeginHeapScan(relation, 0, NowTimeQual,
					 1, (ScanKey) objectIdKey);
	heapTuple = HeapScanGetNextTuple(heapScan, 0, (Buffer *) NULL);
	if (!HeapTupleIsValid(heapTuple)) {
		HeapScanEnd(heapScan);
		RelationCloseHeapRelation(relation);
		elog(WARN, "RuleModifyCatalog: rule %s has no server %d",
		     rulname, rulserver);
	}
}
*/
