
/* ----------------------------------------------------------------
 *			  Utilities to read/construct plans 
 *
 * $Header: /private/postgres/src/test/RCS/testruleplan.c,v 1.12 1992/07/04 04:03:33 mao Exp $
 * ----------------------------------------------------------------
 */



#include <stdio.h>
#include "catalog/catname.h"
#include "access/tupdesc.h"
#include "access/ftup.h"
#include "utils/log.h"
#include "tcop/tcop.h"
#include "rules/prs2.h"
#include "rules/prs2stub.h"
#include "access/heapam.h"
#include "utils/rel.h"
#include "executor/executor.h"
#include "planner/keys.h"
#include "nodes/plannodes.h"
#include "nodes/plannodes.a.h"
#include "nodes/primnodes.h"
#include "nodes/primnodes.a.h"
#include "tcop/dest.h"

/*--------------------- DEFINITIONS ---------------------------------*/
#define MAX_RELATIONS		100
#define MAX_NAME_LENGTH		100

#define IMPORT_ATTRNO(relNo)	(2*(relNo))
#define EXPORT_ATTRNO(relNo)	(2*(relNo)-1)

#define ISBLANK(x)	((x)==' ' || (x)=='\t' || (x)=='\n')
#define ISDIGIT(x)	((x)<='9' && (x)>='0')

/*----
 * some postgres OIDs hardwired in this program...
 * They might have to change....
 */
#define INT4_TYPE	23
#define INT4_LENGTH	4
#define INT4_EQUAL	65	/* NOTE: this is not a pg_operator.oid, */
				/* but a pg_proc.oid			*/
#define BOOL_TYPE	16

/*------
 * LOCK FORMAT
 */
#define LOCK_FORMAT \
"(numOfLocks: 1 (ruleId: %ld lockType: %c attrNo: %d planNo: %d partindx: %d npart: %d))"

/*--------------------- EXTERN VARIABLES& FUNCTIONS  ----------------*/
extern int TESTRULE_DEBUG_FLAG;	/* define in testrules.c */

extern Var RMakeVar();
extern Resdom RMakeResdom();
extern Oper RMakeOper();
extern Const RMakeConst();
extern Param RMakeParam();
extern JoinRuleInfo RMakeJoinRuleInfo();
extern SeqScan RMakeSeqScan();
extern NestLoop RMakeNestLoop();
extern Result RMakeResult();
extern double getTimer();
extern EState CreateExecutorState();
extern List QueryRewrite();

static Var my_make_var();
static Resdom my_make_resdom();
static Const my_make_const();
static Param my_make_param();
static List my_make_qual();
static JoinRuleInfo my_make_ruleinfo();
static void readName();
static Result my_make_constresult();
static Result my_make_paramresult();

List makePlanFromFile();
List makeTestNLRulePlan();
List makeNLRulePlan();
List make_parsetree();


/*==================================================================
 *
 * makePlanFromQuery
 *
 * Given a query create a list of 2-item lists containing
 *	a) a feature (either EXEC_RUN or EXEC_RETONE)
 *	b) a query descriptor
 *
 * If no query rewrite rules are involved then the top-level list must
 * have only one item...
 *
 * NOTE: we support a new kind of POSTGRES command, called "retone"
 * This command is tha same as retrieve, but it stops after the
 * first tuple...
 *
 * Returns LispNil if the query is not a valid one..
 */
List
makePlanFromQuery(queryString)
char *queryString;
{
    List plan, qDesc, parseTrees, parseTree;
    List res, t;
    char *s;
    char query[10000];
    int retoneFlag;
    bool unrewritten;
    List rewritten, oneItem;
    List operation;
    char cmd[100];
    int i;

    if (TESTRULE_DEBUG_FLAG) {
	printf("DEBUG: makePlanFromQuery: query='%s'\n", queryString);
	fflush(stdout);
    }

    /*
     * is it a "retone" command ??
     */
    retoneFlag = 0;
    s = queryString;
    /* 
     * skip blanks
     */
    while (*s == ' ' || *s == '\t' || *s == '\n')
	s++;
    /*
     * now find the first word
     */
    i=0;
    while ((s[i]>='a' && s[i]<='z') || (s[i]>='A' && s[i]<='Z')) {
	cmd[i] = s[i];
	i++;
    }
    cmd[i] = '\0';

    if (!strcmp(cmd, "retone")) {
	retoneFlag = 1;
	operation = lispAtom("retrieve");
        sprintf(query, "retrieve");
        strcat(query, s+6);
    } else if (!strcmp(cmd, "retrieve")) {
	operation = lispAtom("retrieve");
	strcpy(query, queryString);
    } else if (!strcmp(cmd, "replace")) {
	operation = lispAtom("replace");
	strcpy(query, queryString);
    } else if (!strcmp(cmd, "delete")) {
	operation = lispAtom("delete");
	strcpy(query, queryString);
    } else if (!strcmp(cmd, "append")) {
	operation = lispAtom("append");
	strcpy(query, queryString);
    } else {
	fprintf(stderr, "makePlanFromQuery: Illegal command '%s'\n", cmd);
	exitpg(1);
    }

    /*
     * parse the query...
     */
    parseTrees = lispList();
    parser(query, parseTrees);

    unrewritten = true;
    foreach (t, parseTrees) {
	ValidateParse(CAR(t));
	/*
	 * rewrite queries...
	 */
	if (unrewritten == true) {
	    if (( rewritten = QueryRewrite ( CAR(t) )) != LispNil) {
		CAR(t) = CAR(rewritten);
		CDR(last(rewritten)) = CDR(t);
		CDR(t) = CDR(rewritten);
		unrewritten = false;
		continue;
	    }
	    unrewritten = false;
	}

    }

    /*
     * create the plan & the query descriptor
     */
    res = LispNil;
    foreach (t, parseTrees) {
	parseTree = CAR(t);
	init_planner();
	plan = (List) planner(parseTree);
	qDesc = MakeQueryDesc(operation,
				    parseTree, plan, LispNil, LispNil,
				    LispNil, LispNil, lispInteger(0), None);

	/*
	 * OK, now create the lsit with the "feature", & the query Desc.
	 */
	if (retoneFlag) {
	    oneItem = lispCons(lispInteger(EXEC_RETONE),
			lispCons(qDesc, LispNil));
	} else {
	    oneItem = lispCons(lispInteger(EXEC_RUN),
			lispCons(qDesc, LispNil));
	}

	res = nappend1(res, oneItem);

    }

    if (TESTRULE_DEBUG_FLAG) {
	printf("DEBUG: makePlanFromQuery: RESULT = ");
	lispDisplay(res, 0);
	printf("\n");
    }

    return(res);
}


/*==================================================================
 *
 * makePlanFromFile
 *
 * read a Plan from a file. If fileName == NULL< then stdin is used
 *
 * The file format is as follows:
 *
 *  a) the number of the range table entries
 *  b) the relation names (there must be as many names as the
 *	number read in (a)
 *  c) a plan
 *
 * Then the program constructs a fake pase tree (containing
 * only the tange table) and finally a query descriptor (which can
 * be passed to ExecMain).
 * It returns a list containing 2-item list, the first item being
 * EXEC_RUN (an int) and the second the query descriptor.
 *
 */
List
makePlanFromFile(fname)
char *fname;
{
    FILE *fp;
    char *s;
    char relNames[MAX_RELATIONS][MAX_NAME_LENGTH];
    int c;
    int nRel;
    int i,j;
    List parseTree, plan, qDesc;
    List res;
    long size;
    char *realloc();
    List StringToPlan();

    if (fname == NULL) {
	/* 
	 * read from standard input
	 */
	fp = stdin;
    } else {
	fp = fopen(fname, "r");
	if (fp==NULL) {
	    elog(WARN, "Could not open file `%s'", fname);
	}
    }


    /*
     * read the number of range table entries
     */
    if (fscanf(fp, "%d", &nRel) !=1) {
	elog(WARN, "Could not read # of range table entries");
    }

    if (TESTRULE_DEBUG_FLAG) {
	printf("DEBUG: makePlanFromFile: rtable (%d entries)[", nRel);
	fflush(stdout);
    }

    if (nRel > MAX_RELATIONS) {
	elog(WARN, "Can not have more than %d relation in the rtable",
	    MAX_RELATIONS);
    }

    /*
     * read one by one the name of the relations & construct
     * the range table
     */
    for (i=0; i<nRel; i++) {
	readName(fp, relNames[i], 1);
	if (relNames[i][0] == '\0') {
	    elog(WARN, "EOF while reading relation names (%d/%d)", i+1, nRel);
	}
	if (TESTRULE_DEBUG_FLAG) {
	    printf(" %s", i, relNames[i]);
	    fflush(stdout);
	}
    }
    if (TESTRULE_DEBUG_FLAG) {
	printf("]\n");
	fflush(stdout);
    }

    parseTree = make_parsetree(nRel, relNames);

    /*
     * now read the plan
     */
    s = "()";
    size = 100;
    s = malloc(size);
    i = 0;
    while ((c=fgetc(fp))!=EOF) {
	s[i] = c;
	i++;
	if (i>=size) {
	    size = size + 100;
	    s = realloc(s, size);
	}
    }
    s[i] = '\0';

    plan = StringToPlan(s);

    qDesc = MakeQueryDesc(lispAtom("retrieve"),
			    parseTree,
			    plan,
			    LispNil,
			    LispNil,
			    LispNil,
			    LispNil,
			    lispInteger(0),
			    None);

    res = lispCons(lispInteger(EXEC_RETONE),
		lispCons(qDesc, LispNil));
    res = lispCons(res, LispNil);

    if (TESTRULE_DEBUG_FLAG) {
	printf("DEBUG: makePlanFromFile: RESULT = ");
	lispDisplay(res, 0);
	printf("\n");
    }
    
    return(res);
}


/*==============================================================
 *
 * makePlanFromNLRule
 *
 * create a plan suitable for testing rule stubs...
 *
 * This plan corresponds to a query of the form:
 *
 * 	REL1.attr1 = REL2.attr2 and 
 *	REL2.attr3 = REL3.attr4 and 
 *	... and RELn.attrk = C
 *
 * where C is an integer constant
 * All the attributes attr1, attr2 ... are assumed to be of type `int4'
 *
 * ARGUMENTS:
 * the argument is a string containing the following info:
 * a) the number N of relations  (an int)
 * b) the relation names, i.e. N names (with no quotes!)
 * c) the attribute numbers attr1, attr2, etc.
 *	these are (2*N-1) integers.
 * d) the constant value 'C'
 *
 * For example a valid string is:
 *	'3 R1 R2 R3 2 3 4 2 1 123'
 *
 *----------
 * First we call makeTestNLRulePlan to create the plan,
 * and the we create a query descriptor and finally return
 * a 2-item list the first item being EXEC_RUN (an int)
 * and the second the query descriptor.
 *
 */
List
makePlanFromNLRule(string)
char *string;
{
    int nRel;
    char relNames[MAX_RELATIONS][MAX_NAME_LENGTH];
    AttributeNumber attrnos[2*MAX_RELATIONS-1];
    int constvalue;
    List plan, rangeTable, parseTree, qDesc;
    List res;
    char *s;
    int i,j;

    s = string;

    /*
     * parse the given string to find 'nRel', 'relNames',
     * 'attrnos' and 'constvalue'
     */
    while ((*s) && ISBLANK(*s)) s++;		/* skip blanks */
    nRel = 0;
    while ((*s) && ISDIGIT(*s)) {
	nRel = 10 * nRel + (*s) - '0';
	s++;
    }
    if (nRel == 0) {
	fprintf(stderr, "makePlanFromNLRule: illegal string '%s'\n", string);
	exitpg(1);
    }
    for (i=0; i<nRel; i++) {
	while ((*s) && ISBLANK(*s)) s++;	/* skip blanks */
	j=0;
	while((*s) && !ISBLANK(*s)) {
	    relNames[i][j] = *s;
	    s++;
	    j++;
	}
	relNames[i][j] = '\0';
    }
    for (i=0; i<2*nRel-1; i++) {
	while ((*s) && ISBLANK(*s)) s++;	/* skip blanks */
	attrnos[i] = 0;
	while ((*s) && ISDIGIT(*s)) {
	    attrnos[i] = 10 * attrnos[i] + (*s) - '0';
	    s++;
	}
	if (attrnos[i] == 0) {
	    fprintf(stderr,
		"makePlanFromNLRule: illegal string '%s'\n", string);
	    exitpg(1);
	}
    }
    while ((*s) && ISBLANK(*s)) s++;		/* skip blanks */
    constvalue = 0;
    while ((*s) && ISDIGIT(*s)) {
	constvalue = 10 * constvalue + (*s) - '0';
	s++;
    }


    if (TESTRULE_DEBUG_FLAG) {
	int k;
	printf("DEBUG: makePlanFromNLRule: string = '%s'\n", string);
	printf("DEBUG: makePlanFromNLRule:  nRel=%d [", nRel);
	for (k=0; k<nRel; k++){
	    printf(" %s", relNames[k]);
	}
	printf("] attrnos=[");
	for (k=0; k<2*nRel-1; k++){
	    printf(" %hd", attrnos[k]);
	}
	printf("] c=%d\n", constvalue);
	fflush(stdout);
    }

    /*----------
     * OK, now make the plan + parse tree etc....
     */
    plan = makeTestNLRulePlan(nRel, attrnos, true, constvalue, NULL);

    parseTree = make_parsetree(nRel, relNames);
    rangeTable = root_rangetable(parse_root(parseTree));
    qDesc = MakeQueryDesc(lispAtom("retrieve"),
			    parseTree,
			    plan,
			    LispNil,
			    LispNil,
			    LispNil,
			    LispNil,
			    lispInteger(0),
			    None);

    res = lispCons(lispInteger(EXEC_RETONE),
		lispCons(qDesc, LispNil));
    res = lispCons(res, LispNil);

    if (TESTRULE_DEBUG_FLAG) {
	printf("DEBUG: makePlanFromNLRule: RESULT=");
	lispDisplay(res,0);
	printf("\n");
	fflush(stdout);
    }

    return(res);

}


/*----------------------------------------------------------------
 * readName
 *
 * read a name at most MAX_NAME_LENGTH - 1 chars long
 * if 'skipNewlines' is zero, then we only look till
 * a '\n'. Otherwise we continue reading until we actually find a name
 * (i.e. a non blank character).
 */
static
void
readName(fp,s, skipNewlines)
FILE *fp;
char *s;
int skipNewlines;
{
    int i;
    int c;
    int done;

    /*
     * Skip Blanks
     */
    done = 0;
    do {
	c = fgetc(fp);
	if (c == EOF || (!skipNewlines && c == '\n')) {
	    /* 
	     * we have reached EOF (or the end of line
	     * in no-skipNewlines mode)
	     */
	    s[0] = '\0';
	    return;
	}
	if (c!= ' ' && c!= '\t' && c!='\n') {
	    /*
	     * we found the beginning of the name
	     */
	    done = true;
	}
    } while(!done);

    /*
     * read the name
     */
    i = 0;
    s[i] = c;
    while (c != EOF && c != ' ' && c != '\t' && c != '\n') {
	if (i<MAX_NAME_LENGTH-1) {
	    s[i] = c;
	    i++;
	}
	c = fgetc(fp);
    }
    s[i] = '\0';
}

/*-----------------------------------------------------------------
 *
 * my_make_var
 */

static
Var
my_make_var(varno, attno, varid)
Index varno;
AttributeNumber attno;
List varid;
{
    Var var;

    var = RMakeVar();
    set_varno(var, varno);
    set_varattno(var, attno);
    set_vartype(var, (ObjectId) INT4_TYPE);        /* type = int4 */
    set_varid(var, varid);
    
    return(var);
}


/*-----------------------------------------------------------------
 *
 * my_make_resdom
 */
static
Resdom
my_make_resdom(resno, resname)
AttributeNumber resno;
Name resname;
{
    Resdom res;

    res = RMakeResdom();

    set_resno(res, resno);
    set_restype(res, (ObjectId) INT4_TYPE);	/* int4 */
    set_reslen(res, (Size) INT4_LENGTH);
    set_resname(res, resname);
    set_reskeyop(res, 0);

    return(res);
}

/*-----------------------------------------------------------------
 *
 * my_make_const
 *
 * create an integer constant with the given value
 */
static
Const
my_make_const(value)
int value;
{
    Const cnst;

    cnst = RMakeConst();
    set_consttype(cnst, (ObjectId) INT4_TYPE);	/* int4 */
    set_constlen(cnst, (Size) INT4_LENGTH);
    set_constisnull(cnst, false);
    set_constbyval(cnst, true);
    set_constvalue(cnst, Int32GetDatum(value));

    return(cnst);
}
/*-----------------------------------------------------------------
 * my_make_param
 */
static
Param
my_make_param(id, name)
int id;
char *name;
{
    Param p;
    Name nnn;

    nnn = (Name) palloc(sizeof(NameData));
    bzero(nnn, sizeof(NameData));
    strcpy(nnn, name);

    p = RMakeParam();
    set_paramkind(p, PARAM_OLD);
    set_paramid(p, (AttributeNumber) id);
    set_paramname(p, nnn);
    set_paramtype(p, (ObjectId) 0);

    return(p);
}

/*-----------------------------------------------------------------
 *
 * my_make_qual
 *
 * make a qual of the form "var1 = var2" (where var1 & var2
 * are int4
 */
static
List
my_make_qual(operand1, operand2)
Node operand1;
Node operand2;
{
    Oper oper;
    List qual, qualClause;

    oper = RMakeOper();
    set_opno(oper, (ObjectId) INT4_EQUAL);	/* `=' for int4 */
    set_oprelationlevel(oper, LispNil);
    set_opresulttype(oper, (ObjectId) BOOL_TYPE);	/* bool */
    set_op_fcache(oper, NULL);

    qualClause = lispCons(oper, LispNil);
    qualClause = nappend1(qualClause, operand1);
    qualClause = nappend1(qualClause, operand2);

    qual = lispCons(qualClause, LispNil);

    return(qual);
}

/*-----------------------------------------------------------------
 *
 * my_make_ruleinfo
 */
static
JoinRuleInfo
my_make_ruleinfo(stubId, innerattrno, outerattrno, lockString)
int stubId;
AttributeNumber innerattrno;
AttributeNumber outerattrno;
char *lockString;
{
    JoinRuleInfo ruleInfo;
    ObjectId ruleId;
    RuleLock lock;


    ruleInfo = RMakeJoinRuleInfo();

    lock = StringToRuleLock(lockString);

    set_jri_operator(ruleInfo, (ObjectId)65);	/* '=' for int4 */
    set_jri_inattrno(ruleInfo, innerattrno);
    set_jri_outattrno(ruleInfo, outerattrno);
    set_jri_lock(ruleInfo, lock);
    set_jri_ruleid(ruleInfo, ruleId);
    set_jri_stubid(ruleInfo, (Prs2StubId) stubId);
    set_jri_stub(ruleInfo, (Prs2OneStub) NULL);
    set_jri_stats(ruleInfo, (Prs2StubStats) NULL);
    return(ruleInfo);
}


/*-----------------------------------------------------------------
 *
 * my_make_constresult
 */
static
Result
my_make_constresult(constvalue)
int constvalue;
{
    Result node;
    List tlist;
    Resdom res;
    Const cnst;

    /*
     * construct the target list
     */
    res = my_make_resdom((AttributeNumber) 1, "const");
    cnst = my_make_const(constvalue);
    tlist = lispCons(
		lispCons(res, lispCons(cnst, LispNil)),
		LispNil);

    /*
     * now construct the node
     */
    node = RMakeResult();

    set_cost((Plan) node, 0.0);
    set_fragment((Plan) node, 0);
    set_state((Plan) node, NULL);
    set_qptargetlist((Plan)node, tlist);
    set_qpqual((Plan) node, LispNil);
    set_lefttree((Plan)node, LispNil);
    set_righttree((Plan)node, LispNil);
    set_resrellevelqual(node, LispNil);
    set_resconstantqual(node, LispNil);
    set_resstate(node, NULL);
    
    return(node);
}

/*-----------------------------------------------------------------
 *
 * my_make_paramresult
 */
static
Result
my_make_paramresult(id, name)
int id;
char *name;
{
    Result node;
    List tlist;
    Resdom res;
    Param param;

    /*
     * construct the target list
     */
    res = my_make_resdom((AttributeNumber) 1, "param");
    param = my_make_param(id, name);
    tlist = lispCons(
		lispCons(res, lispCons(param, LispNil)),
		LispNil);

    /*
     * now construct the node
     */
    node = RMakeResult();

    set_cost((Plan) node, 0.0);
    set_fragment((Plan) node, 0);
    set_state((Plan) node, NULL);
    set_qptargetlist((Plan)node, tlist);
    set_qpqual((Plan) node, LispNil);
    set_lefttree((Plan)node, LispNil);
    set_righttree((Plan)node, LispNil);
    set_resrellevelqual(node, LispNil);
    set_resconstantqual(node, LispNil);
    set_resstate(node, NULL);
    
    return(node);
}

/*==========================================================================
 *
 * makeTestNLRulePlan
 *
 * Create a plan suitable for testing rule stubs
 *
 * This plan corresponds to a query of the form:
 *
 * 	REL1.attr1 = REL2.attr2 and 
 *	REL2.attr3 = REL3.attr4 and 
 *	... and RELn.attrk = C
 *
 * All the attributes attr1, attr2 ... are assumed to be of type `int4'
 *
 * Parameters:
 *	nRel = number of relations involved
 *	attrs = an array of attribute numbers. there must be
 *		(2 * nRel - 1) entries in this array.
 *
 *	isconstant: if true, then 'C' is a integer constant
 *		with a value equal to `value'. (in this case
 *		`name' is ignored).
 *		If `isconstant' is false, then 'C' is a Param
 *		with type = PARAM_OLD, name = `name' and id
 *		(i.e. attribute number) = `value'.
 */

List
makeTestNLRulePlan(nRel, attrNumbers, isconstant, value, name)
int nRel;
AttributeNumber *attrNumbers;
bool isconstant;
int value;
char *name;
{
    SeqScan scans[MAX_RELATIONS];
    NestLoop nloops[MAX_RELATIONS];
    char lockString[1000];
    int i;

    if (nRel > MAX_RELATIONS) {
	elog(WARN, "Only %d relations allowed", nRel);
    }

    /*
     * create the scan nodes, one for each relation
     */
    for (i=0; i<nRel; i++) {
	Index scanrelid;
	List qual;
	List targetList;
	AttributeNumber attnoImport, attnoExport;
	Var var1, var2;
	Resdom res1, res2;

	scanrelid = (Index) (i+1);
	/*
	 * the target list will only contain the 2 attributes
	 * used in Joins. One of them is used in the qualification
	 * of the parent 'NestLoop' node (`import' attribute)
	 * and the other one is the one that will be actually
	 * projected by this NestLoop (export attribute).
	 *
	 */

	qual = LispNil;

	attnoImport = attrNumbers[IMPORT_ATTRNO(i)];
	/*
	 * a special case: if this is the righmost
	 * scan node (i.e. scanrelid == 1)
	 * then have its export attribute equal to 1
	 */
	if (i==0)
	    attnoExport = (AttributeNumber) 1;
	else
	    attnoExport = attrNumbers[EXPORT_ATTRNO(i)];
	var1 = my_make_var(scanrelid, attnoImport,
		    lispCons(lispInteger(scanrelid),
			lispCons(lispInteger(attnoImport), LispNil)));
	var2 = my_make_var(scanrelid, attnoExport,
		    lispCons(lispInteger(scanrelid),
			lispCons(lispInteger(attnoExport), LispNil)));
	res1 = my_make_resdom((AttributeNumber)1, "null");
	res2 = my_make_resdom((AttributeNumber)2, "null");
	targetList = lispCons(
			lispCons(res1, lispCons(var1, LispNil)),
			LispNil);
	targetList = nappend1(targetList,
			lispCons(res2, lispCons(var2, LispNil)));

	scans[i] = RMakeSeqScan();
	set_fragment ( scans[i], 0 );
	set_parallel ( scans[i], 1 );
	set_state (scans[i], (EState)NULL);
	set_qptargetlist (scans[i], targetList);
	set_qpqual (scans[i] , qual);
	set_lefttree (scans[i], (Plan) NULL);
	set_righttree (scans[i] , (Plan) NULL);
	set_scanrelid (scans[i] , scanrelid);
	set_scanstate (scans[i], (ScanState)NULL);
    }

    /*
     * OK, now create the NestLoop nodes
     */
    for (i=0; i<nRel; i++) {
	Var varjoin1, varjoin2, varout;
	Node node;
	Resdom res;
	AttributeNumber attnoInner, attnoOuter;
	Plan leftTree, rightTree;
	List innerTargetList, outerTargetList, targetList;
	List qual;
	JoinRuleInfo ruleInfo;
	AttributeNumber innerAttrNo;
	Result resultNode;
	List varid;

	if (i == 0) {
	    /*
	     * leftmost NestLoop node.
	     * the only one with its left children being a RESULT node.
	     * all the others have lefttree = nestloop and
	     * righttree = scan node.
	     */

	    if (isconstant)
		resultNode = my_make_constresult(value);
	    else
		resultNode = my_make_paramresult(value, name);
	    leftTree = (Plan) resultNode;
	} else {
	    leftTree = (Plan) nloops[i-1];
	}
	rightTree = (Plan) scans[nRel-1-i];
	innerTargetList = get_qptargetlist(rightTree);
	outerTargetList = get_qptargetlist(leftTree);

	/*
	 * Now create the target list
	 */
	res = my_make_resdom((AttributeNumber)1, "foo");
	varout = my_make_var((Index)INNER, (AttributeNumber)2,
		    get_varid(CADR(CADR(innerTargetList))));
	targetList = lispCons(
			lispCons(res, lispCons(varout, LispNil)),
			LispNil);
	
	/*
	 * now the qualification
	 */
	if (i != 0) {
	    /*
	     * this is not the leftmost nset loop node
	     */
	    varid = get_varid(CADR(CAR(outerTargetList)));
	    varjoin1 = my_make_var((Index)OUTER, (AttributeNumber)1, varid);
	    varid = get_varid(CADR(CAR(innerTargetList)));
	    varjoin2 = my_make_var((Index)INNER, (AttributeNumber)1, varid);
	    qual = my_make_qual(varjoin1, varjoin2);
	} else {
	    /*
	     * this is the the one! Its left tree is a Result node
	     */
	    node = (Node) CADR(CAR(outerTargetList));
	    varid = get_varid(CADR(CAR(innerTargetList)));
	    varjoin2 = my_make_var((Index)INNER, (AttributeNumber)1, varid);
	    qual = my_make_qual(node, varjoin2);
	}

	varid = get_varid(CADR(CADR(innerTargetList)));
	innerAttrNo = (AttributeNumber) CInteger(CADR(varid));


	/*
	 * now create the rule info for that node
	 */
	sprintf(lockString, LOCK_FORMAT,
	    100+i, 'Z', innerAttrNo, 1, 0, 0);
	ruleInfo = my_make_ruleinfo(i+1, innerAttrNo, (AttributeNumber) 1,
				lockString);
	/*
	 * finally the NestLoop itself!
	 */
	nloops[i] = RMakeNestLoop();
	set_cost (nloops[i] , 0.0 );
	set_fragment (nloops[i], 0 );
	set_parallel (nloops[i], 1 );
	set_state (nloops[i], (EState)NULL);
	set_qptargetlist (nloops[i], targetList);
	set_qpqual (nloops[i] , qual);
	set_lefttree (nloops[i], leftTree);
	set_righttree (nloops[i] , rightTree);
	set_nlstate (nloops[i], (NestLoopState)NULL);
	set_ruleinfo(nloops[i], ruleInfo);
    }

    /*
     * WE ARE DONE !!!!!
     */
    return((List)nloops[nRel-1]);
}

/*-----------------------------------------------------------------------
 * printRulePlanStats
 *-----------------------------------------------------------------------
 */
printRulePlanStats(plan, rangeTable)
Plan plan;
List rangeTable;
{
    JoinRuleInfo ruleInfo;
    Prs2StubStats stats;
    Plan leftTree, rightTree;
    Index scanrelid;
    List rt_entry;
    char *relName;

    if (ExecIsNestLoop(plan)) {
	/*
	 * find the inner relation name
	 */
	rightTree = (Plan) get_righttree(plan);
	if (rightTree != NULL && ExecIsSeqScan(rightTree)){
	    scanrelid = get_scanrelid(rightTree);
	    rt_entry = nth(scanrelid-1, rangeTable);
	    relName = CString(rt_relname(rt_entry));
	}
	ruleInfo = get_ruleinfo(plan);
	if (ruleInfo != NULL) {
	    stats = get_jri_stats(ruleInfo);
	    if (stats != NULL) {
		printf("\n");
		printf(
		"+++ (printRuleStats) REL: %s, stubs +%d -%d, locks +%d, -%d\n",
		relName, stats->stubsAdded, stats->stubsDeleted,
		stats->locksAdded, stats->locksDeleted);
	    } else {
		printf("+++ (printRuleStats) REL: %s, --nil--\n", relName);
	    }
	}
    }
    leftTree = (Plan) get_lefttree(plan);
    rightTree = (Plan) get_righttree(plan);
    if (leftTree != NULL) {
	printRulePlanStats(leftTree, rangeTable);
    }
    if (rightTree != NULL) {
	printRulePlanStats(rightTree, rangeTable);
    }

}


/*-------------------------------------------------------------------
 * make_parsetree
 *
 * make a range table given the names of relations
 */
List
make_parsetree(nRel, relNames) 
int nRel;
char relNames[][MAX_NAME_LENGTH];
{
    int i;
    List rangeTable;
    List entry;
    ObjectId reloid;
    Relation rel;
    List parseTree;
    List root;

    rangeTable = LispNil;

    for (i=0; i<nRel; i++) {
	/*
	 * now find the reloid of the relation
	 */
	rel = RelationNameOpenHeapRelation(&(relNames[i][0]));
	reloid = RelationGetRelationId(rel);
	RelationCloseHeapRelation(rel);
	entry = lispCons(lispString(relNames[i]), LispNil);
	entry = nappend1(entry, lispString(relNames[i]));
	entry = nappend1(entry, lispInteger((int)reloid));
	entry = nappend1(entry, lispInteger(0));
	entry = nappend1(entry, LispNil);
	entry = nappend1(entry, LispNil);
	rangeTable = nappend1(rangeTable, entry);
    }

    /*
     * OK, now construct a fake parse tree.
     * As far as I know we only need:
     *  a) range table
     *  b) result Relation
     *  c) target list (only in a 'replace' command)
     */
    root = lispCons(lispInteger(1), LispNil);
    root = nappend1(root, lispAtom("retrieve"));
    root = nappend1(root, LispNil);
    root = nappend1(root, rangeTable);
    parseTree = lispCons(root, LispNil);

    return(parseTree);
}

/*==============================================================
 *
 * addRulePlan
 *
 * create a plan and add it to the pg_prs2plans
 *
 * This plan corresponds to a query of the form:
 *
 * 	REL1.attr1 = REL2.attr2 and 
 *	REL2.attr3 = REL3.attr4 and 
 *	... and RELn.attrk = PARAM
 *
 * where PARAM is a parameter.
 * All the attributes attr1, attr2 ... are assumed to be of type `int4'
 *
 * ARGUMENTS:
 * the argument is a string containing the following info:
 * a) the rule id
 * b) the plan id
 * c) the number N of relations  (an int)
 * d) the relation names, i.e. N names (with no quotes!)
 * e) the attribute numbers attr1, attr2, etc.
 *	these are (2*N-1) integers.
 * f) the parameter name and parameter id
 * g) the [nRel+1] locks to be associated with the stubs...
 *	of the nRel relations + the `export' lock to be stored in
 * 	the rule catalog allong with the generated plan.
 *
 * For example a valid string is:
 *	'123453 2 3 R1 R2 R3 2 3 4 2 1 salary 2 \
 *	(numOfLocks:0)	\
 *	(numOfLocks:1 ........) \
 *	(numOfLocks:1 ........)'
 *
 *----------
 * First we call makeTestNLRulePlan to create the plan,
 *
 */
List
addRulePlan(string)
char *string;
{
    int nRel;
    char relNames[MAX_RELATIONS][MAX_NAME_LENGTH];
    AttributeNumber attrnos[2*MAX_RELATIONS-1];
    int ruleId, planId;
    int paramId;
    char paramName[MAX_NAME_LENGTH];
    char locks[MAX_RELATIONS+1][MAX_NAME_LENGTH];
    char *s;
    int i,j;
    LispValue plan, parseTree, res;
    char *lockString;
    Portal portal;

    s = string;

    /*
     * parse the given string to find 'nRel', 'relNames',
     * 'attrnos' and 'constvalue'
     */
    while ((*s) && ISBLANK(*s)) s++;		/* skip blanks */
    /*
     * read rule id
     */
    ruleId = 0;
    while ((*s) && ISDIGIT(*s)) {
	ruleId = 10 * ruleId + (*s) - '0';
	s++;
    }
    if (ruleId == 0) {
	fprintf(stderr, "addRulePlan: illegal string '%s'\n", string);
	exitpg(1);
    }
    /*
     * now the plan id
     */
    while ((*s) && ISBLANK(*s)) s++;		/* skip blanks */
    planId = 0;
    while ((*s) && ISDIGIT(*s)) {
	planId = 10 * planId + (*s) - '0';
	s++;
    }
    if (planId == 0) {
	fprintf(stderr, "addRulePlan: illegal string '%s'\n", string);
	exitpg(1);
    }
    /*
     * now the # of relations & th relation names...
     */
    while ((*s) && ISBLANK(*s)) s++;		/* skip blanks */
    nRel = 0;
    while ((*s) && ISDIGIT(*s)) {
	nRel = 10 * nRel + (*s) - '0';
	s++;
    }
    if (nRel == 0) {
	fprintf(stderr, "addRulePlan: illegal string '%s'\n", string);
	exitpg(1);
    }
    for (i=0; i<nRel; i++) {
	while ((*s) && ISBLANK(*s)) s++;	/* skip blanks */
	j=0;
	while((*s) && !ISBLANK(*s)) {
	    relNames[i][j] = *s;
	    s++;
	    j++;
	}
	relNames[i][j] = '\0';
    }
    for (i=0; i<2*nRel-1; i++) {
	while ((*s) && ISBLANK(*s)) s++;	/* skip blanks */
	attrnos[i] = 0;
	while ((*s) && ISDIGIT(*s)) {
	    attrnos[i] = 10 * attrnos[i] + (*s) - '0';
	    s++;
	}
	if (attrnos[i] == 0) {
	    fprintf(stderr,
		"addRulePlan: illegal string '%s'\n", string);
	    exitpg(1);
	}
    }
    /*
     * now the parameter name & id
     */
    while ((*s) && ISBLANK(*s)) s++;		/* skip blanks */
    j=0;
    while((*s) && !ISBLANK(*s)) {
	paramName[j] = *s;
	s++;
	j++;
    }
    paramName[j] = '\0';
    while ((*s) && ISBLANK(*s)) s++;		/* skip blanks */
    paramId = 0;
    while ((*s) && ISDIGIT(*s)) {
	paramId = 10 * paramId + (*s) - '0';
	s++;
    }

    /*
     * finally read the locks.
     * read from the first '(' up to the enclosing ')'
     */
    for (i=0; i<nRel+1; i++) {
	int level;

	while ((*s) && ISBLANK(*s)) s++;		/* skip blanks */
	if (*s!='(') {
	    fprintf(stderr,
		"addRulePlan: illegal lock in '%s'\n", string);
	    exitpg(1);
	}
	level = 1;
	j=0;
	locks[i][j] = *s;
	j++;
	s++;
	while(level >0) {
	    if (*s == '\0') {
		fprintf(stderr,
		    "addRulePlan: illegal lock in '%s'\n", string);
		exitpg(1);
	    }
	    if (*s == ')')
		level--;
	    if (*s == '(')
		level++;
	    locks[i][j] = *s;
	    s++;
	    j++;
	}
	locks[i][j] = '\0';
    }
    
    if (TESTRULE_DEBUG_FLAG) {
	int k;
	printf("DEBUG: addRulePlan: string = '%s'\n", string);
	printf("DEBUG: addRulePlan:  nRel=%d [", nRel);
	for (k=0; k<nRel; k++){
	    printf(" %s", relNames[k]);
	}
	printf("] attrnos=[");
	for (k=0; k<2*nRel-1; k++){
	    printf(" %hd", attrnos[k]);
	}
	printf("] param=$%s (%d)\n", paramName, paramId);
	fflush(stdout);
    }

    /*----------
     * OK, now make the plan + parse tree etc....
     */
    plan = makeNLRulePlan(nRel, attrnos, paramName, paramId, locks);
    parseTree = make_parsetree(nRel, relNames);

    if (TESTRULE_DEBUG_FLAG) {
	printf("DEBUG: addRulePlan: PLAN=");
	lispDisplay(plan);
	printf("\n");
	fflush(stdout);
    }

    lockString = &(locks[nRel][0]);
    res = nappend1(res, lispString(lockString));
    res = nappend1(res,
		lispCons(parseTree, lispCons(plan, LispNil)));
    res = lispCons(lispString(Prs2RulePlanType_EXPORT), res);
    
    StartTransactionCommand(portal);
    prs2InsertRulePlanInCatalog(ruleId, planId, res);
    CommitTransactionCommand();

}

/*==========================================================================
 *
 * makeNLRulePlan
 *
 * Create a plan for an export/import lock.
 *
 * This plan corresponds to a query of the form:
 *
 * 	REL(0).attr(0) = REL(1).attr(1) and 
 *	REL(1).attr(2) = REL(2).attr(3) and 
 *	... 
 *	REL(n-2).attr(k-3) = REL(n-1).attr(k-2) and 
 *	REL(n-1).attr(k-1) = PARAMETER
 *
 * where k = 2*n-1
 *
 * All the attributes attr1, attr2 ... are assumed to be of type `int4'
 *
 * Parameters:
 *	nRel = number of relations involved
 *	attrs = an array of attribute numbers. there must be
 *		(2 * nRel - 1) entries in this array.
 *	paramname, paramid: the name of attrno of PARAMETER
 *	planIds = an array (with nRel) elements, one for each
 *		node in the relation with the plan id for the
 *		corresponding stub.
 *	locks: the locks associated with each JoinRuleInfo
 *
 * The plan is a left deep tree with "n" scan nodes (one per
 * relation). Scan node "i" corresponds to relation "i" (i=0...n-1)
 * and all scan nodes have a null qualifcation except for
 * scan node "n-1" which has the qualifcation:
 *	REL(n-1).attr(k-1) = PARAMETER
 *
 * There are "n-1" Join nodes.
 * Join node "n-2" is the leftmost and has as left and right children
 * the scan nodes "n-1" and "n-2" respectively.
 * All other Join nodes "i" (i=0...n-3) have as left child
 * the Join node "i+1" and as right child the scan node "i".
 *
 * Every relation "REL(i)" has an import attribute: "attr(2*i)"
 * and en export attribute "attr(2*i-1)", with the exception
 * of "REL(0)" which does not have an export attribute.
 * (but in our code, we just assume an export attribute number = 1).
 * 
 * Each scan node has a target list with two elements, the first one
 * being the import attribute and the second one the export attribute
 * of the relation.
 * The only exception is scan node "n-1" (the leftmost one) which
 * only has one element, the export attribute of "REL(n-1)".
 *
 * Each Join has a target list with only one element, the export
 * attribute of the relation being scanned in its right subtree.
 * The join qualification for join node "i" is:
 *  OUTER.1 = INNER.1
 * (where OUTER.1 is the export attribute of REL(i-1) and INNER.1
 * is the import attribute of REL(i).
 * 
 */

List
makeNLRulePlan(nRel, attrNumbers, paramname, paramid, locks)
int nRel;
AttributeNumber *attrNumbers;
int paramid;
char *paramname;
char locks[MAX_RELATIONS+1][MAX_NAME_LENGTH];
{
    SeqScan scans[MAX_RELATIONS];
    NestLoop nloops[MAX_RELATIONS];
    int i;

    if (nRel > MAX_RELATIONS) {
	elog(WARN, "Only %d relations allowed", nRel);
    }

    /*
     * create the scan nodes, one for each relation
     */
    for (i=0; i<nRel; i++) {
	Index scanrelid;
	List qual;
	List targetList;
	AttributeNumber attnoImport, attnoExport;
	Var var1, var2;
	Resdom res1, res2;
	Param param;

	scanrelid = (Index) (i+1);
	/*
	 * the target list will only contain the 2 attributes
	 * used in Joins. One of them is used in the qualification
	 * of the parent 'NestLoop' node (`import' attribute)
	 * and the other one is the one that will be actually
	 * projected by this NestLoop (export attribute).
	 *
	 */
	attnoImport = attrNumbers[IMPORT_ATTRNO(i)];
	/*
	 * a special case: if this is the righmost
	 * scan node (i.e. scanrelid == 1)
	 * then have its export attribute equal to attrno=1
	 */
	if (i==0)
	    attnoExport = (AttributeNumber) 1;
	else
	    attnoExport = attrNumbers[EXPORT_ATTRNO(i)];
	/*
	 * another special case. If this is the leftmost node,
	 * (i==nRel-1) then the target list will contain
	 * only one element (the export attribute).
	 */
	if (i==nRel-1) {
	    var2 = my_make_var(scanrelid, attnoExport,
		    lispCons(lispInteger(scanrelid),
			lispCons(lispInteger(attnoExport), LispNil)));
	    res2 = my_make_resdom((AttributeNumber)2, "null");
	    targetList = lispCons(
			lispCons(res2, lispCons(var2, LispNil)),
			LispNil);
	} else {
	    /*
	     * normal case: the target list has first an element
	     * for the import attribute, and then one for the export
	     * attribute.
	     */
	    var1 = my_make_var(scanrelid, attnoImport,
		    lispCons(lispInteger(scanrelid),
			lispCons(lispInteger(attnoImport), LispNil)));
	    var2 = my_make_var(scanrelid, attnoExport,
		    lispCons(lispInteger(scanrelid),
			lispCons(lispInteger(attnoExport), LispNil)));
	    res1 = my_make_resdom((AttributeNumber)1, "null");
	    res2 = my_make_resdom((AttributeNumber)2, "null");
	    targetList = lispCons(
			lispCons(res1, lispCons(var1, LispNil)),
			LispNil);
	    targetList = nappend1(targetList,
			lispCons(res2, lispCons(var2, LispNil)));
	}
	/*
	 * Now the qualification. This is nil, unless if this
	 * is the leftmost node, in which case it must be
	 * something like "import_attribute = parameter"
	 */
	if (i==nRel-1) {
	    var1 = my_make_var(scanrelid, attnoImport,
		    lispCons(lispInteger(scanrelid),
			lispCons(lispInteger(attnoImport), LispNil)));
	    param = my_make_param(paramid, paramname);
	    qual = my_make_qual((Node)var1, (Node)param);
	} else {
	    qual = LispNil;
	}

	scans[i] = RMakeSeqScan();
	set_fragment ( scans[i], 0 );
	set_parallel ( scans[i], 1 );
	set_state (scans[i], (EState)NULL);
	set_qptargetlist (scans[i], targetList);
	set_qpqual (scans[i] , qual);
	set_lefttree (scans[i], (Plan) NULL);
	set_righttree (scans[i] , (Plan) NULL);
	set_scanrelid (scans[i] , scanrelid);
	set_scanstate (scans[i], (ScanState)NULL);
    }

    /*
     * OK, now create the Join nodes
     * NOTE: currently we only support NestLoop joins....
     */
    for (i=nRel-2; i>=0; i--) {
	Var varjoin1, varjoin2, varout;
	Node node;
	Resdom res;
	AttributeNumber attnoInner, attnoOuter;
	Plan leftTree, rightTree;
	List innerTargetList, outerTargetList, targetList;
	List qual;
	JoinRuleInfo ruleInfo;
	AttributeNumber innerAttrNo;
	List varid;

	if (i == nRel-2) {
	    /*
	     * leftmost NestLoop node.
	     * the only one with its left children being a scan node.
	     * all the others have lefttree = nestloop and
	     * righttree = scan node.
	     */
	    leftTree = (Plan) scans[nRel-1];
	} else {
	    leftTree = (Plan) nloops[i+1];
	}
	rightTree = (Plan) scans[i];
	innerTargetList = get_qptargetlist(rightTree);
	outerTargetList = get_qptargetlist(leftTree);

	/*
	 * Now create the target list
	 * This will only have the INNER.2, i.e. the
	 * export attribute of the inner relation "REL(i)"
	 */
	res = my_make_resdom((AttributeNumber)1, "foo");
	varout = my_make_var((Index)INNER, (AttributeNumber)2,
		    get_varid(CADR(CADR(innerTargetList))));
	targetList = lispCons(
			lispCons(res, lispCons(varout, LispNil)),
			LispNil);
	
	/*
	 * now the qualification
	 * this must be: "INNER.1 = OUTER.1"
	 */
	varid = get_varid(CADR(CAR(outerTargetList)));
	varjoin1 = my_make_var((Index)OUTER, (AttributeNumber)1, varid);
	varid = get_varid(CADR(CAR(innerTargetList)));
	varjoin2 = my_make_var((Index)INNER, (AttributeNumber)1, varid);
	qual = my_make_qual(varjoin1, varjoin2);

	/*
	 * now create the rule info for that node
	 */
	varid = get_varid(CADR(CADR(innerTargetList)));
	innerAttrNo = (AttributeNumber) CInteger(CADR(varid));
	ruleInfo = my_make_ruleinfo(
	    i,
	    innerAttrNo,
	    (AttributeNumber) 1,
	    &(locks[i][0]));

	/*
	 * finally the NestLoop itself!
	 */
	nloops[i] = RMakeNestLoop();
	set_cost (nloops[i] , 0.0 );
	set_fragment (nloops[i], 0 );
	set_parallel (nloops[i], 1 );
	set_state (nloops[i], (EState)NULL);
	set_qptargetlist (nloops[i], targetList);
	set_qpqual (nloops[i] , qual);
	set_lefttree (nloops[i], leftTree);
	set_righttree (nloops[i] , rightTree);
	set_nlstate (nloops[i], (NestLoopState)NULL);
	set_ruleinfo(nloops[i], ruleInfo);
    }

    /*
     * WE ARE DONE !!!!!
     */
    return((List)nloops[0]);
}
