/*-------------------------------------------------------------------------
 *
 * pgcheck_index_fast.c
 *	  integrity control tool for indexes
 *
 * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 * Portions Copyright (c) 2007, Robert Mach (mach.robert@gmail.com)
 *
 *  Copyright (c) <YEAR>, <OWNER>
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright notice, this
 *    list of conditions and the following disclaimer. 
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 *
 *
 *
 * INTERFACE ROUTINES
 *		pgcheck_index_fast	- Does comparion between number of items in given relation 
 *							  and number of items in its indexes. Pressumption is, that 
 *                            counts of those number should be the same.
 *
 * NOTES
 *	  
 *
 *-------------------------------------------------------------------------
 */


#include "postgres.h"
#include <string.h>
#include <ctype.h>
#include "fmgr.h"
#include "access/heapam.h"
#include "utils/syscache.h"
#include "utils/relcache.h"
#include "access/genam.h"



#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

PG_FUNCTION_INFO_V1(pgcheck_index_fast);

/*
 * get_item_count_heap
 *
 *	This function returns number of valid items in given heap.
 *
 */
int
get_item_count_heap(Relation relation)
{
	HeapScanDesc 	scan;
	HeapTuple		tuple;
	int 			items_count = 0;

	/* begin scanning of heap*/
	scan = heap_beginscan (relation,SnapshotNow, 0, NULL);
   
    
	/*check the validity of scan*/
	if (!HeapScanIsValid(scan)) 
		ereport(ERROR,
				(errmsg("pgcheck_index_fast didn't succeed with start of heap scan of relation (%d)",relation->rd_id)));

	/*get new tuples until there are some*/
	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        items_count++;	
	
	/* end of heap scanning*/
	heap_endscan(scan);	

	return(items_count);
}

/*
 * get_item_count_index
 *
 *	This function retuns number of item in this index. So far this relation works only
 *	for b-tree indexes. If given index is some other then b-tree function returns 0.
 *	
 */
int
get_item_count_index(Oid index_oid,Relation indexed_rel)
{
	Relation		index_rel;
	IndexScanDesc	indexscan;
	HeapTuple		tuple;
	int 			items_count = 0;		//number of items in this index
	
	/* open index */
	index_rel=index_open(index_oid,AccessShareLock);

	if (!RelationIsValid(index_rel)) 
		ereport(ERROR,
				(errmsg("could not open index with OID %u", index_oid)));
	
	/* so far, this function works only for b-tree indexes*/
	if((index_rel)->rd_rel->relam != BTREE_AM_OID) 
	{
		ereport (WARNING,
				 (errmsg("index (%d) in not b-tree index",index_oid)));
		return 0;
	}

	/* begin index scan */
	indexscan = index_beginscan(indexed_rel,index_rel,SnapshotNow,0,(ScanKey) NULL);
	
	/* count the items */
	while ((tuple=index_getnext(indexscan,ForwardScanDirection))!=NULL)
				items_count++;
	
	/* clean up */
	index_endscan(indexscan);
	index_close(index_rel,AccessShareLock);

	return(items_count);
}

/*
 * pgcheck_index_fast
 *
 *	This function count number of items in given relation and compare it with number of
 *	items in its indexes. It returns WARNING if number are not equal (= corrupted data).
 *	So far, it work only of b-tree indexes.
 */
Datum
pgcheck_index_fast(PG_FUNCTION_ARGS)
{
	Oid				relation_oid = PG_GETARG_OID(0);
	Oid				index_oid;
	Relation 	 	relation;	
	char			*relation_name;
	List			*list_of_indexes;			//list of indexes
	ListCell   		*cell;
	int				num_of_items_heap = 0;		//number of items in heap
	int				num_of_items_index = 0;		//number of items in index


	/* get the name of relation */
	relation_name = (char *) get_rel_name(relation_oid);

	/* does given oid exists? */	
	if (relation_name == NULL)
		ereport(ERROR,
				(errmsg("could not find item with oid %d in pg_class",relation_oid)));
	
	ereport(NOTICE,
			(errmsg("name of tested relation: '%s'",relation_name)));

	/* open relation */
	relation=relation_open(relation_oid,AccessShareLock);	
	
	/* loaded properly?*/
	if (!relation->rd_isvalid) 
		ereport(ERROR,
				(errmsg("tested relation with oid = %d wasn't loaded properly",relation_oid)));

	/* get the list of indexes */
	list_of_indexes = RelationGetIndexList(relation);
	
	if (get_rel_relkind(relation_oid) == RELKIND_INDEX)
			ereport(ERROR,
					(errcode(ERRCODE_DATA_CORRUPTED),
					 errmsg("given oid (%d) is index",relation_oid),
					 errdetail("oid of relation who's indexes should be tested is reqired")));
	
	/* does this relaion has an index? */
	if (list_of_indexes == NIL)
		ereport(ERROR,
				(errmsg("relation %s doesn't have index",relation_name)));
	
	else
	{
		/* write out number of indexes of given relation */
 		ereport(NOTICE,
				(errmsg("relation '%s' (%d) has %d indexes",relation_name,relation_oid,list_of_indexes->length)));		
		
		/* get number of item in given relaion */
		num_of_items_heap = get_item_count_heap(relation);
		ereport(DEBUG1,
				(errmsg("number of items in heap '%s' (%d) = %d",relation_name,relation_oid,num_of_items_heap)));
		
		/* check each index of given relation */
		foreach(cell,list_of_indexes)
		{
			index_oid=lfirst_oid(cell);	
			
			/* is it really index */
			if (get_rel_relkind(index_oid) != RELKIND_INDEX)
				ereport(ERROR,
						(errcode(ERRCODE_DATA_CORRUPTED),
						 errmsg("found oid (%d) is not index",index_oid)));

			/* get number of items in index */
			num_of_items_index = get_item_count_index(index_oid,relation);
			ereport(DEBUG1,
					(errmsg("number of items in index '%s' (%d) = %d",get_rel_name(index_oid),relation_oid,num_of_items_index)));

			/* compare number of items */
			if (num_of_items_heap == num_of_items_index)
				ereport(NOTICE,
						(errmsg("numbers of items relation '%s'(%d) and in it's index '%s'(%d) are the same (%d=%d)",
							relation_name,relation_oid,get_rel_name(index_oid),index_oid,num_of_items_heap,num_of_items_index)));
			else
				ereport(WARNING,
						(errcode(ERRCODE_DATA_CORRUPTED),
						 errmsg("relation '%s'(%d) and its index '%s'(%d) don't have the same number of items (%d != %d)",
							relation_name,relation_oid,get_rel_name(index_oid),index_oid,num_of_items_heap,num_of_items_index)));
		}
	}
	
			

	/* Clean up*/
	list_free(list_of_indexes);
	relation_close(relation,AccessShareLock);

	PG_RETURN_INT32(num_of_items_heap);
}