/*-------------------------------------------------------------------------
 *
 * utility.c--
 *    Contains functions which control the execution of the POSTGRES utility
 *    commands.  At one time acted as an interface between the Lisp and C
 *    systems.
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *    /usr/local/devel/pglite/cvs/src/backend/tcop/utility.c,v 1.13 1995/03/17 20:26:55 andrew Exp
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"
#include "parser/dbcommands.h"
#include "access/xact.h"
#include "catalog/catalog.h"

#include "commands/async.h"
#include "commands/cluster.h"
#include "commands/command.h"
#include "commands/copy.h"
#include "commands/creatinh.h"
#include "commands/defrem.h"
#include "commands/purge.h"
#include "commands/rename.h"
#include "commands/view.h"
#include "commands/version.h"
#include "commands/vacuum.h"
#include "commands/recipe.h"

#include "nodes/parsenodes.h"
#include "parse.h"
#include "utils/elog.h"
#include "utils/builtins.h"
#include "utils/acl.h"
#include "rewrite/rewriteRemove.h"
#include "tcop/tcopdebug.h"
#include "tcop/dest.h"

#ifndef NO_SECURITY
#include "miscadmin.h"
#include "utils/acl.h"
#include "utils/syscache.h"
#endif


/* ----------------
 *	CHECK_IF_ABORTED() is used to avoid doing unnecessary
 *	processing within an aborted transaction block.
 * ----------------
 */
#define CHECK_IF_ABORTED() \
    if (IsAbortedTransactionBlockState()) { \
	elog(NOTICE, "(transaction aborted): %s", \
	     "queries ignored until END"); \
	commandTag = "*ABORT STATE*"; \
	break; \
    } \
    
/* ----------------
 *	general utility function invoker
 * ----------------
 */
void
ProcessUtility(Node *parsetree,
	       char *commandString,
	       CommandDest dest)
{
    String	commandTag = NULL;
    String	relname;
    Name	relationName;
    Name	userName;
    
    userName = GetPgUserName();
    
    switch (nodeTag(parsetree)) {
	/* ********************************
	 *	transactions
	 * ********************************
	 */
    case T_TransactionStmt:
	{
	    TransactionStmt *stmt = (TransactionStmt *)parsetree;
	    switch (stmt->command) {
	    case BEGIN_TRANS:
		commandTag = "BEGIN";
		CHECK_IF_ABORTED();
		BeginTransactionBlock();
		break;
		
	    case END_TRANS:
		commandTag = "END";
		EndTransactionBlock();
		break;
		
	    case ABORT_TRANS:
		commandTag = "ABORT";
		UserAbortTransactionBlock();
		break;
	    }
	}
	break;
      
	/* ********************************
	 *	portal manipulation
	 * ********************************
	 */
    case T_ClosePortalStmt:
	{
	    ClosePortalStmt *stmt = (ClosePortalStmt *)parsetree;

	    commandTag = "CLOSE";
	    CHECK_IF_ABORTED();
	
	    PerformPortalClose(stmt->portalname, dest);
	}
	break;
      
    case T_FetchStmt:
	{
	    FetchStmt *stmt = (FetchStmt *)parsetree;
	    char *portalName = stmt->portalname;
	    bool forward;
	    int	count;

	    commandTag = "FETCH";
	    CHECK_IF_ABORTED();

	    forward = (bool)(stmt->direction == FORWARD);

	    /* parser ensures that count is >= 0 and 
	       'fetch ALL' -> 0 */
	       
	    count = stmt->howMany;
	    PerformPortalFetch(portalName, forward, count, commandTag, dest);
	}
	break;
      
	/* ********************************
	 *	relation and attribute manipulation
	 * ********************************
	 */
    case T_CreateStmt:
	commandTag = "CREATE";
	CHECK_IF_ABORTED();
      
	DefineRelation((CreateStmt *)parsetree);
	break;
      
    case T_DestroyStmt:
	{
	    DestroyStmt *stmt = (DestroyStmt *)parsetree;
	    List *arg;
	    List *args = stmt->relNames;

	    commandTag = "DROP";
	    CHECK_IF_ABORTED();

	    foreach (arg, args) {
		relname = strVal(lfirst(arg));
		if (NameIsSystemRelationName((Name)relname))
		    elog(WARN, "class \"%-.*s\" is a system catalog",
			 NAMEDATALEN, relname);
#ifndef NO_SECURITY
		if (!pg_ownercheck(userName->data, relname, RELNAME))
		    elog(WARN, "you do not own class \"%-.*s\"",
			 NAMEDATALEN, relname);
#endif
	    }
	    foreach (arg, args) {
		relname = strVal(lfirst(arg));
		RemoveRelation((Name)relname);
	    }
	}
	break;
      
    case T_PurgeStmt:
	{
	    PurgeStmt *stmt = (PurgeStmt *)parsetree;

	    commandTag = "PURGE";
	    CHECK_IF_ABORTED();
	    
	    RelationPurge((Name)stmt->relname,
			  stmt->beforeDate, /* absolute time string */
			  stmt->afterDate); /* relative time string */
	}
	break;
      
    case T_CopyStmt:
	{
	    CopyStmt *stmt = (CopyStmt *)parsetree;
	    String	filename;
	    bool	isBinary;
	    bool	isFrom;
	    bool        pipe = false;

	    commandTag = "COPY";
	    CHECK_IF_ABORTED();
	    
	    relname = stmt->relname;
	    isBinary = stmt->binary;
	    
	    isFrom = (bool)(stmt->direction == FROM);
	    filename = stmt->filename;
	    
#ifndef NO_SECURITY
	    if (isFrom) {
		if (!pg_aclcheck(relname, userName->data, ACL_RD))
		    elog(WARN, "read on \"%-.*s\": permission denied",
			 NAMEDATALEN, relname);
	    } else {
		if (!pg_aclcheck(relname, userName->data, ACL_WR))
		    elog(WARN, "write on \"%-.*s\": permission denied",
			 NAMEDATALEN, relname);
	    }
#endif
	    
	    /* Free up file descriptors - going to do a read... */
	    
	    closeOneVfd();

	    /*
	     * use stdin/stdout if filename is null.
	     */
	    if (filename == NULL)
		pipe = true;
	    
	    if (pipe && IsUnderPostmaster) dest = CopyEnd;
	    
	    DoCopy(relname, isBinary, isFrom, pipe, filename);
	}
	break;
      
    case T_AddAttrStmt:
	{
	    AddAttrStmt *stmt = (AddAttrStmt *)parsetree;
	    NameData    relnameData;

	    commandTag = "ADD";
	    CHECK_IF_ABORTED();
	
	    /* owner checking done in PerformAddAttribute (now recursive) */
	    relname = stmt->relname;
	    namestrcpy(&relnameData,relname);
	    PerformAddAttribute(&relnameData,
				userName,
				stmt->inh,
				stmt->colDef);
	}
	break;
      
	/*
	 * schema
	 */
    case T_RenameStmt:
	{
	    RenameStmt *stmt = (RenameStmt *)parsetree;

	    commandTag = "RENAME";
	    CHECK_IF_ABORTED();
	
	    relname = stmt->relname;
	    if (NameIsSystemRelationName((Name)relname))
		elog(WARN, "class \"%-.*s\" is a system catalog",
		     NAMEDATALEN, relname);
#ifndef NO_SECURITY
	    if (!pg_ownercheck(userName->data, relname, RELNAME))
		elog(WARN, "you do not own class \"%-.*s\"",
		     NAMEDATALEN, relname);
#endif
	    
	    /* ----------------
	     *	XXX using len == 3 to tell the difference
	     *	    between "rename rel to newrel" and
	     *	    "rename att in rel to newatt" will not
	     *	    work soon because "rename type/operator/rule"
	     *	    stuff is being added. - cim 10/24/90
	     * ----------------
	     * [another piece of amuzing but useless anecdote -- ay]
	     */
	    if (stmt->column == NULL) {
		/* ----------------
		 *	rename relation
		 *
		 *	Note: we also rename the "type" tuple
		 *	corresponding to the relation.
		 * ----------------
		 */
		renamerel(relname, /* old name */
			  stmt->newname); /* new name */
		TypeRename(relname, /* old name */
			   stmt->newname); /* new name */
	    } else {
		/* ----------------
		 *	rename attribute
		 * ----------------
		 */
		renameatt(relname, /* relname */
			  stmt->column, /* old att name */
			  stmt->newname, /* new att name */
			  userName,
			  stmt->inh); /* recursive? */
	    }
	}
	break;
      
    case T_ChangeACLStmt:
	{
	    ChangeACLStmt *stmt = (ChangeACLStmt *)parsetree;
	    List *i;
	    AclItem *aip;
	    unsigned modechg;

	    commandTag = "CHANGE";
	    CHECK_IF_ABORTED();
	    
	    aip = stmt->aclitem;
	    modechg = stmt->modechg;
#ifndef NO_SECURITY
	    foreach (i, stmt->relNames) {
		relname = strVal(lfirst(i));
		if (!pg_ownercheck(userName->data, relname, RELNAME))
		    elog(WARN, "you do not own class \"%-.*s\"",
			 NAMEDATALEN, relname);
	    }
#endif
	    foreach (i, stmt->relNames) {
		relname = strVal(lfirst(i));
		ChangeAcl(relname, aip, modechg);
	    }

	}
	break;
      
	/* ********************************
	 *	object creation / destruction
	 * ********************************
	 */
    case T_DefineStmt:
	{
	    DefineStmt *stmt = (DefineStmt *)parsetree;

	    commandTag = "CREATE";
	    CHECK_IF_ABORTED();

	    switch(stmt->defType) {
	    case OPERATOR:
		DefineOperator((Name)stmt->defname, /* operator name */
			       stmt->definition); /* rest */
		break;
	    case P_TYPE:
		{
		    NameData typeNameData;
		    namestrcpy(&typeNameData, stmt->defname);
		    DefineType (&typeNameData, stmt->definition);
		}
		break;
	    case AGGREGATE:
		DefineAggregate((Name)stmt->defname, /*aggregate name */
				stmt->definition); /* rest */
		break;
	    }
	}
	break;
      
    case T_ViewStmt:		/* VIEW */
	{
	    ViewStmt *stmt = (ViewStmt *)parsetree;
	    NameData viewNameData;

	    commandTag = "CREATE";
	    CHECK_IF_ABORTED();
	    namestrcpy(&viewNameData, stmt->viewname);
	    DefineView (&viewNameData, stmt->query); /* retrieve parsetree */
	}
	break;
      
    case T_ProcedureStmt:	/* FUNCTION */
	commandTag = "CREATE";
	CHECK_IF_ABORTED();
	DefineFunction((ProcedureStmt *)parsetree, dest); /* everything */
	break;
      
    case T_IndexStmt:
	{
	    IndexStmt *stmt = (IndexStmt *)parsetree;

	    commandTag = "CREATE";
	    CHECK_IF_ABORTED();
	    /* XXX no support for ARCHIVE indices, yet */
	    DefineIndex((Name)stmt->relname, /* relation name */
			(Name)stmt->idxname, /* index name */
			(Name)stmt->accessMethod, /* am name */
			stmt->indexParams, /* parameters */
			stmt->withClause,
			(Expr*)stmt->whereClause,
			stmt->rangetable);
	}
	break;
      
    case T_RuleStmt:
	{
	    RuleStmt *stmt = (RuleStmt *)parsetree;
#if 0
#ifndef NO_SECURITY
	    relname = stmt->relname; /* XXX object? */
	    if (!pg_aclcheck(relname, userName->data, ACL_RU))
		elog(WARN, "define rule on \"%-.*s\": permission denied",
		     NAMEDATALEN, relname);
#endif
#endif
	    commandTag = "CREATE";
	    CHECK_IF_ABORTED();
	    DefineQueryRewrite(stmt);
	}
	break;
      
    case T_ExtendStmt:
	{
	    ExtendStmt *stmt = (ExtendStmt *)parsetree;

	    commandTag = "EXTEND";
	    CHECK_IF_ABORTED();
	
	    ExtendIndex((Name)stmt->idxname, /* index name */
			(Expr*)stmt->whereClause, /* where */
			stmt->rangetable);
	}
	break;
      
    case T_RemoveStmt:
	{
	    RemoveStmt *stmt = (RemoveStmt *)parsetree;

	    commandTag = "DROP";
	    CHECK_IF_ABORTED();
	
	    switch(stmt->removeType) {
	    case AGGREGATE:
		RemoveAggregate((Name)stmt->name);
		break;
	    case INDEX:
		relname = stmt->name;
		if (NameIsSystemRelationName((Name)relname))
		    elog(WARN, "class \"%-.*s\" is a system catalog index",
			 NAMEDATALEN, relname);
#ifndef NO_SECURITY
		if (!pg_ownercheck(userName->data, relname, RELNAME))
		    elog(WARN, "you do not own class \"%-.*s\"",
			 NAMEDATALEN, relname);
#endif
		RemoveIndex((Name)relname);
		break;
	    case RULE:
		{
		    NameData ruleNameData;
		    String rulename = stmt->name;
#ifndef NO_SECURITY
		
		    relationName = RewriteGetRuleEventRel(rulename);
		    if (!pg_aclcheck(relationName->data, userName->data, ACL_RU))
			elog(WARN, "remove rule on \"%-.*s\": permission denied",
			     NAMEDATALEN, relationName->data);
#endif
		    namestrcpy(&ruleNameData,rulename);
		    RemoveRewriteRule(&ruleNameData);
		}
		break;
	    case P_TYPE:
#ifndef NO_SECURITY
		/* XXX moved to remove.c */
#endif
		RemoveType((Name)stmt->name);
		break;
	    case VIEW:
		{
		    String viewName = stmt->name;
		    NameData ruleName;
		    NameData viewNameData;
		    extern Name RewriteGetRuleEventRel();

		    namestrcpy(&viewNameData,viewName);
#ifndef NO_SECURITY
		
		    makeRetrieveViewRuleName(&ruleName, &viewNameData);
		    relationName = RewriteGetRuleEventRel(ruleName.data);
		    if (!pg_ownercheck(userName->data, relationName->data, RELNAME))
			elog(WARN, "remove view \"%-.*s\": permission denied",
			     NAMEDATALEN, relationName);
#endif
		    RemoveView(&viewNameData);
		}
		break;
	    }
	    break;
	}
	break;
    case T_RemoveFuncStmt:
	{
	    RemoveFuncStmt *stmt = (RemoveFuncStmt *)parsetree;
	    commandTag = "DROP";
	    CHECK_IF_ABORTED();
	    RemoveFunction((Name)stmt->funcname,
			   length(stmt->args),
			   stmt->args);
	}
	break;
      
    case T_RemoveOperStmt:
	{
	    RemoveOperStmt *stmt = (RemoveOperStmt *)parsetree;
	    String type1 = (String) NULL, type2 = (String) NULL;
		
	    commandTag = "DROP";
	    CHECK_IF_ABORTED();

	    if (lfirst(stmt->args)!=NULL)
		type1 = strVal(lfirst(stmt->args));
	    if (lsecond(stmt->args)!=NULL)
		type2 = strVal(lsecond(stmt->args));
	    RemoveOperator((Name)stmt->opname, (Name)type1, (Name)type2);
	}
	break;
      
    case T_VersionStmt:
	{
	    VersionStmt *stmt = (VersionStmt *)parsetree;

	    commandTag = "VERSION";
	    CHECK_IF_ABORTED();
	    if (stmt->direction == FORWARD)
		CreateVersion((Name)stmt->relname,
			      stmt->fromRelname, stmt->date);
#ifdef NOTYET
	    else
		CreateBackwardDelta((Name)stmt->relname,
				    stmt->fromRelname, stmt->date);
#endif		
	}
	break;
      
    case T_CreatedbStmt:
	{
	    CreatedbStmt *stmt = (CreatedbStmt *)parsetree;

	    commandTag = "CREATEDB";
	    CHECK_IF_ABORTED();
	    createdb(stmt->dbname);
	}
	break;
      
    case T_DestroydbStmt:
	{
	    DestroydbStmt *stmt = (DestroydbStmt *)parsetree;

	    commandTag = "DESTROYDB";
	    CHECK_IF_ABORTED();
	    destroydb(stmt->dbname);
	}
	break;
      
	/* Query-level asynchronous notification */
    case T_NotifyStmt:
	{
	    NotifyStmt *stmt = (NotifyStmt *)parsetree;

	    commandTag = "NOTIFY";
	    CHECK_IF_ABORTED();

	    Async_Notify(stmt->relname);
	}
	break;
      
    case T_ListenStmt:
	{
	    ListenStmt *stmt = (ListenStmt *)parsetree;

	    commandTag = "LISTEN";
	    CHECK_IF_ABORTED();

	    Async_Listen(stmt->relname,MasterPid);
	}
	break;
      
	/* ********************************
	 *	dynamic loader
	 * ********************************
	 */
    case T_LoadStmt:
	{
	    LoadStmt *stmt = (LoadStmt *)parsetree;
	    FILE *fp, *fopen();
	    char *filename;

	    commandTag = "LOAD";
	    CHECK_IF_ABORTED();

	    filename = stmt->filename;
	    closeAllVfds();
	    if ((fp = fopen(filename, "r")) == NULL)
		elog(WARN, "LOAD: could not open file %s", filename);
	    fclose(fp);
	    load_file(filename);
	}
	break;

    case T_ClusterStmt:
	{
	    ClusterStmt *stmt = (ClusterStmt *)parsetree;

	    commandTag = "CLUSTER";
	    CHECK_IF_ABORTED();

	    cluster(stmt->relname, stmt->indexname, NULL);
	}
	break;
	
    case T_VacuumStmt:
	commandTag = "VACUUM";
	CHECK_IF_ABORTED();
	vacuum(((VacuumStmt *) parsetree)->vacrel);
	break;
      
	/* ********************************
	   Tioga-related statements
	   *********************************/
    case T_RecipeStmt:
	{
	    RecipeStmt* stmt = (RecipeStmt*)parsetree;
	    commandTag="EXECUTE RECIPE";
	    CHECK_IF_ABORTED();
	    beginRecipe(stmt);
	}
	break;
      
      
	/* ********************************
	 *	default
	 * ********************************
	 */
    default:
	elog(WARN, "ProcessUtility: command #%d unsupported",
	     nodeTag(parsetree));
	break;
    }
    
    /* ----------------
     *	tell fe/be or whatever that we're done.
     * ----------------
     */
    EndCommand(commandTag, dest);
}

