/*
 * Written by Tomas Dosoudil <tomas.dosoudil@gmail.com>
 * This file is distributed under BSD-style license.
 *
 */

#include	<postgres.h>
#include	<fcntl.h>
#include	<unistd.h>
#include	<sys/stat.h>
#include	<storage/bufpage.h>
#include	<access/htup.h>
#include	"pgdview.h"
#include	"input_parser.h"
#include	"output.h"


typedef struct filedesqueue
{
	uint32			fileId;		/* number of file */
	int				fr;			/* open file for read */
	struct filedesqueue	*next;
	struct filedesqueue	*prev;
} FileDesQueue;

typedef struct
{
	uint8			nfile;
	FileDesQueue	*head;
	FileDesQueue	*tail;
} FileDesHeader;

typedef struct
{
	char			*optPath;			/* database path */

	LinkedList		*optPage;
	uint8			optPageHeader;
	LinkedList		*optItemList;
	uint8			optTupleHeader;
	uint8			optTupleData;
	uint8			optFollowCtid;
} InputValue;

typedef struct
{
	char 			*fileName;
	off_t			fileSize;
	uint8			fileSplit;
	uint32			pageSize;
	off_t			pageOffset;
	uint8			*page;
	char			*dbPath;
} ConfigValue;

	

/* initializacion */
int		InitValues(void);
char*	DatabasePathValidation(char*);
int		Clean(void);

/* files and pages */
int		GetPage(int);
int		GetFileDescriptor(int, char*);
int		AddFileDescriptor(int, char*);
int		RemoveFileDescriptor(void);

/* process page, tuple, itemlist, header */
int		ProcessPage(void);
void	ProcessPageInfo(int, uint32);
void	ProcessPageHeader(PageHeader);
void	ProcessItemList(uint32, uint32);
void 	ProcessTuple(HeapTupleHeader, uint32, uint32);
void	ProcessCheckRange(int*, int*, LinkedList*, int);
int		FollowCtid(void);

/* get data from page */
int				OpenFile(char*);
int				ReadPage(int, uint8*, int, off_t);
uint32			_PageGetPageSize(int);
PageHeader		GetPageHeader(uint8*);
int				ItemsCount(int pLower);
HeapTuple		GetHeapTuple(uint8*, uint16);
HeapTupleHeader	GetHeapTupleHeader(uint8*, uint16);

/* global variables */
InputValue		options;			/* input options */
ConfigValue		config;				/* config value */
FileDesHeader	Fdh;				/* fd header */

int
main(int argc, char *argv[])
{
	int 		optc = argc;		
	char 		**optv = argv;
	int			ret;

	/* ./progname */
	if (argc == 1) {
		fprintf(stderr, "Database file must be set\n");
		Usage();
		return 1;
	}

	/* only help is required */
	if (strcmp(*(argv + 1), "-h") == 0) {
		Usage();
		return 0;
	}

	/* ./progname -parameter */
	if (argc == 2 && **(optv + 1) == '-') {
		fprintf(stderr, "Database file must be set\n");
		Usage();
		return 1;
	}

	/* over program name */
	optv++;
	optc--;

	/* parse each argument */
	while (--optc > 0) {

		/* first character is dash. if not it should be invalid input */
		if(**optv == '-') {
			(*optv)++;
		}
		else {
			fprintf(stderr, "Invalid input: %s\n", *optv);
			Usage();
			return 1;
		}

		while (**optv) {
			/* page */
			if (**optv == 'p') {
				if (HAS_ARGUMENT(optc, optv)) {
					options.optPage = GetLinkedList(*(optv + 1), 0);
					if (options.optPage == NULL) {
						fprintf(stderr, "Stoped in -p\n");
						return 1;
					}
					optv++;
					optc--;
					break;
				}	
				else {
					options.optPage = AllocateNewNode();
					if (options.optPage == NULL)
						return 1;

					options.optPage->from = 0;
					options.optPage->to = 0;
				}			
			}
			/* page header */
			else if (**optv == 'c') {
				options.optPageHeader = 1;	
			}
			/* item list */
			else if (**optv == 'i') {
				if (HAS_ARGUMENT(optc, optv)) {
					options.optItemList = GetLinkedList(*(optv + 1), 1);
					if (options.optItemList == NULL) {
						fprintf(stderr, "Stoped in -i\n");
						return 1;
					}
					optv++;
					optc--;
					break;
				}	
				else {
					options.optItemList = AllocateNewNode();
					if (options.optItemList == NULL)
						return 1;

					options.optItemList->from = 1;
					options.optItemList->to = 0;
				}	
			}
			/* tuple header */
			else if (**optv == 't') {
				options.optTupleHeader = 1;	
			}
			else if (**optv == 'd') {
				options.optTupleData = 1;	
			}
			/* database path */
			else if (**optv == 'D') {
				if (HAS_ARGUMENT(optc, optv)) {
					options.optPath = (char*) *(optv + 1);
					optv++;
					optc--;
					break;
				}
				else {
					fprintf(stderr, "Set database path following table_oid\n");
					Usage();
					return 1;
				}
			}
			/* follow ctid */
			else if (**optv == 'f') {
				options.optFollowCtid = 1;
			}
			/* help */
			else if (**optv == 'h') {
				Usage();
				return 0;
			}
			else {
				fprintf(stderr, "Unknown option: %c\n", **optv);
				Usage();
				return 1;
			}
			(*optv)++;	/* next character */
		}
		optv++;	/* next argument */
	}

	/* file is set directly without option -D */
	if (options.optPath == NULL) {
		config.dbPath = "";
	}
	else {
		config.dbPath = DatabasePathValidation(options.optPath);
		config.fileSplit = 1;
	}

	config.fileName = *optv;


	/* do not accept parameter -t or -d without parameter -i */
	if (!options.optItemList && (options.optTupleHeader || options.optTupleData)) {
		fprintf(stderr, "Invalid syntax: Set item list\n");
		Usage();
		return 1;
	}
	

	ret = InitValues();
	if (ret == 0) {
		if (options.optFollowCtid && options.optPage && options.optItemList) {
			FollowCtid();
		}
		/* show data */
		else if (options.optPage || options.optPageHeader || options.optItemList) {
			ret = ProcessPage();
			if (ret != 0) {
				return 1;
			}
		}
	}

	ret = Clean();
	if (ret == -1)
		return 1;

	return 0;
}

/* 
 * chect input value and set defaults
 */
int
InitValues(void)
{	
	int			fr;
	int 		ret;
	char		absName[MAX_FILENAME_LENGTH];	/* abs. path and name of file */
	struct stat	statBuf;

	/*
	 * File init
	 * get file size
	 */
	/* file could be splited */
	if (config.fileSplit == 1) 
		snprintf(absName, MAX_FILENAME_LENGTH, "%s%s.0", config.dbPath, config.fileName);
	else
		snprintf(absName, MAX_FILENAME_LENGTH, "%s%s", config.dbPath, config.fileName);

	/* file size */
	ret = stat(absName, &statBuf);
	if (ret == -1) {
		fprintf(stderr, "Can not detect file size of: %s\n", absName);
		fprintf(stderr, "%s\n", strerror(errno));
		return -1;
	}
	config.fileSize = statBuf.st_size;

	/* Postgre makes empty file */
	if (config.fileSize == 0) {
		fprintf(stderr, "File is empty: %s\n", config.fileName);
		return -1;
	}

	fr = open(absName, O_RDONLY, S_IRWXU);
    if (fr == -1) {
       	fprintf(stderr, "Can not open file: %s\n", absName);
		fprintf(stderr, "%s\n", strerror(errno));
		return -1;
    }


	/* 
	 * Page init
	 */
	config.pageSize = _PageGetPageSize(fr);
	if (config.pageSize == 0) {
		fprintf(stderr, "Page size is 0 - it is probably incorrect db file\n");
		return -1;
	}

	ret = close(fr);
	if (ret == -1) {
		fprintf(stderr, "Can not close file: %s\n", absName);
		fprintf(stderr, "%s\n", strerror(errno));
		return -1;
	}
	
	/* allocate memory for page */
	config.page = (uint8*) malloc(config.pageSize);
	if (config.page == NULL) {
		fprintf(stderr, "Can not allocate memory (%dB) for page.\n", config.pageSize);
		fprintf(stderr, "%s\n", strerror(errno));
		return -1;
	}

	/* if u want follow ctid you must use -i (item list) parameter */
	if (options.optFollowCtid == 1) {
		if (options.optItemList == NULL) {
			fprintf(stderr, "Use -i with follow ctid\n");
			return -1;
		}
	}

	/* page must be set. default is first page */ 
	if (options.optPage == NULL) {
		options.optPage = AllocateNewNode();
		if (options.optPage == NULL) {
			perror("Can not allocate memory\n");
			return -1;
		}
		options.optPage->from = 0;
	}

	/* if y want follow ctid you must enter number of page(default is first) and item on page */
	if (options.optFollowCtid == 1) {
		if (options.optPage->next != NULL || options.optPage->to != -1) {
			fprintf(stderr, "Number of one page is required\n");
			return -1;
		}
		if (options.optItemList->next != NULL || options.optItemList->to != -1) {
			fprintf(stderr, "Number of one item is required\n");
			return -1;
		}
	}

	return 0;
}

/*
 * check last character in path
 * - character must be slash
 *
 * function returns path in correct form 
 */
char*
DatabasePathValidation(char *path)
{
	int		pathLen;
	char	*newPath;
	int		newPathLen;

	pathLen = strlen(path);
	newPathLen = pathLen + 2;
	if (path[pathLen - 1] != '/') {
		newPath = (char*) malloc(sizeof(char) * newPathLen);
		if (newPath == NULL) {
			perror("Error malloc newPath");
			Clean();
			exit(1);
		}
		
		snprintf(newPath, newPathLen, "%s/", path);

		path = newPath;

	}

	return path;
}
/*
 * Clean memory and opened files
 */
int
Clean(void)
{
	int		ret = 0;
	int		gret = 0;

	/* close files and deallocate */
	while (Fdh.nfile > 0) {
		ret = RemoveFileDescriptor();
		gret = (gret == -1)? -1 : ret;	/* remmember error but continue */
	}

	if (options.optPage != NULL)
		FreeLinkedList(&(options.optPage));

	if (options.optItemList != NULL)
		FreeLinkedList(&(options.optItemList));

	if (config.page != NULL)
		free(config.page);

	return gret;
}

/* 
 * return number of file or -1
 */
int
GetPage(int numberOfPage)
{
	int				fr;
	char			absName[MAX_FILENAME_LENGTH];
	int				numberOfFile;
	int				ret;

	/* count number of next file */
	numberOfFile = (numberOfPage * config.pageSize) / config.fileSize;

	/* file name */
	if (numberOfFile == 0 && config.fileSplit == 0) 
		snprintf(absName, MAX_FILENAME_LENGTH, "%s%s",
				config.dbPath, config.fileName);
	else
		snprintf(absName, MAX_FILENAME_LENGTH, "%s%s.%d",
				config.dbPath, config.fileName, numberOfFile);


	fr = GetFileDescriptor(numberOfFile, absName);
	if (fr == -1)
		return -1;

	/* read page */
	config.pageOffset = (off_t) (numberOfPage * config.pageSize) % config.fileSize;
	ret = ReadPage(fr, config.page, config.pageSize, config.pageOffset);
	
	/* check how many bytes it reads */
	if (ret != config.pageSize) 
		return -1;

	return numberOfFile;
}

int
GetFileDescriptor(int numberOfFile, char *absName)
{
	FileDesQueue			*FdItem;
	int						counter;
	int 					ret;

	#if FD_DEBUG
	printf("FDBG>filename: %s\n", absName);
	#endif

	if (Fdh.nfile != 0) {
		FdItem = Fdh.head;
		/* find file in queue */
		for (counter = 0; counter < Fdh.nfile; counter++) {
			if (FdItem->fileId == numberOfFile) {
				#if FD_DEBUG
				printf("FDBG>is in fd(%d)\n",FdItem->fileId);
				#endif

				return FdItem->fr;
			}
			else {
				FdItem = FdItem->prev;
				if (FdItem == NULL)
					break;
			}
		}

		if (FdItem == NULL) {
			#if FD_DEBUG
			printf("FDBG>fd not found\n");
			#endif

			ret = AddFileDescriptor(numberOfFile, absName);
			if (ret != 0)
				return -1;
		}
			
		return (Fdh.head)->fr;
	}
	/* queue is empty */
	else {
		#if FD_DEBUG
		printf("FDBG>queue is empty\n");
		#endif

		ret = AddFileDescriptor(numberOfFile, absName);
		if (ret == -1)
			return -1;

		return (Fdh.head)->fr;
	}

	return -1;
}

int
AddFileDescriptor(int numberOfFile, char *absName)
{
	FileDesQueue		*FdItem;

	/* remove last fd */
	if (Fdh.nfile == MAX_FD_OPENED) {
		RemoveFileDescriptor();
	}

	FdItem = (FileDesQueue*) malloc(sizeof(FileDesQueue));
	if (FdItem == NULL) {
		perror("Out of memory\n");
		Clean();
		exit(1);
	}

	FdItem->fileId = numberOfFile;
	FdItem->fr = OpenFile(absName);
	if (FdItem->fr == -1) {
		/* without any warning messages */
		free(FdItem);
		return -1;
	}

	/* queue is empty */
	if (Fdh.head == NULL) {
		Fdh.tail = FdItem;  /* tail = head*/
	}
	else {
		FdItem->prev = Fdh.head;
		(Fdh.head)->next = FdItem;
	}

	Fdh.head = FdItem;
	Fdh.nfile++;

	#if FD_DEBUG
	printf("FDBG>add fd(%d)\n", FdItem->fileId);
	#endif

	return 0;
}

int
RemoveFileDescriptor(void)
{
	FileDesQueue		*FdItem;
	int					ret = 0;

	FdItem = Fdh.tail;
	Fdh.tail = FdItem->next;

	#if FD_DEBUG
	printf("FDBG>remove fd(%d)\n", FdItem->fileId);
	#endif

	/* when tail = head */
	if (Fdh.tail != NULL)
		(Fdh.tail)->prev = NULL;

	Fdh.nfile--;
	ret = close(FdItem->fr);
	if (ret == -1) {
		fprintf(stderr, "Can not close file with number %d\n", FdItem->fileId);
		fprintf(stderr, strerror(errno));
		ret = -1;
	}

	free(FdItem);

	return ret;
}

/*
 * ProcessPage run over page and jump to function which works with 
 * itemlist, tuple.. if necessary
 * upon succssesful completion return 0, othrewise -1
 */
int
ProcessPage(void)
{
	PageHeader	pageHeader;
	LinkedList	*range;
	int			ret;	
	int 		numberOfFile = -2;	/* 0 is first file, -1 is error */
	int			numberOfPage = 0;
	int			condition;
	uint32		itemsCount;

	range = options.optPage;

	do {			
		numberOfPage = range->from;
		condition = 1;			/* set true */

		/* process one by one range in input list */
		while (condition) {			
			ret = GetPage(numberOfPage);

			if (ret == -1) {
				/* 
				 * i used full range
				 * last GetPage try to open file which does not exist
				 */
				if (range->to == 0) {
					return 0;
				}
				/* when i use wrong range - program must end with error*/
				else {
					fprintf(stderr, "Page number %d is out of range\n", numberOfPage);
					return -1;
				}

			}
			
			/*
			 * I used file name directly without parameter -D but GetPage returns 
			 * number greater than zero. It means GetPage tries to load page from next file.
			 * It means out of range
			 */
			if (ret > 0 && options.optPath == NULL) {
				if (range->to == 0) {
					return 0;
				}
				else {
					fprintf(stderr, "Page number %d is out of range\n", numberOfPage);
					return -1;
				}
			}

			/* print file name if is diferent of previous */
			if (numberOfFile != ret && options.optPath != NULL) {
				numberOfFile = ret;
				PrintFileName(config.fileName, numberOfFile);
			}

			pageHeader = GetPageHeader(config.page);			/* get page header */
			itemsCount = ItemsCount(pageHeader->pd_lower);		/* count item pointer */

			/* process page info */
			ProcessPageInfo(numberOfPage, itemsCount);

			/* process other parameters */
			if (options.optPageHeader == 1) 
				ProcessPageHeader(pageHeader);

			if (options.optItemList != NULL)
				ProcessItemList(itemsCount, numberOfPage);

			/* 
			 * CHECK CONDITION
			 */
			numberOfPage++;

			/* show one page only ... from was set, to was not set */
			if (range->to == -1)
				break;

			/* 
			 * we can use numberOfPage 'i' only if range 'to' was set. it means 
			 * it must be greater then 0 
			 */
			if (range->to > 0) {
				if (numberOfPage > range->to)
					condition = 0;
			}
		}		
	
	} while ((range = range->next));
	
	return 0;
}

/*
 * print page information. 
 * it shows number of page, tuples (total, used, deleted) and free space in page
 */
void 
ProcessPageInfo(int numberOfPage, uint32 itemsCount)
{
	ItemId 		itId;
	uint32		itemsUsed = 0;
	uint32		itemsDelete = 0;
	Size		freeSpace = 0;
	int 		i;

	/* count tuples */
	for (i = 0; i < itemsCount; i++) {
		itId = PageGetItemId(config.page, (i + 1));
		if (ItemIdGetFlags(itId) == LP_USED)
			itemsUsed++;
		else if (ItemIdGetFlags(itId) == LP_DELETE)
			itemsDelete++;
	}

	/* get free space */
	freeSpace = PageGetFreeSpace((Page)config.page);

	/* print info */
	PrintPageInfo(numberOfPage, itemsCount, itemsUsed, itemsDelete, freeSpace);	
}


/*
 * print page header
 */
void 
ProcessPageHeader(PageHeader pageHeader)
{
	PrintPageHeader(pageHeader);	
}

/*
 * print item list selected in range
 */
void 
ProcessItemList(uint32 itemCount, uint32 numberOfPage)
{
	HeapTupleHeader		tupleHeader;
	LinkedList	*range;
	ItemId		itId;
	int			from;	
	int 		to;
	int			i;

	range = options.optItemList;


	do {		
		/* check and set */
		ProcessCheckRange(&from, &to, range, itemCount);

		/* process one by one range in input list */
		for (i = from; i <= to; i++) {
			itId = PageGetItemId(config.page, i);
			tupleHeader = GetHeapTupleHeader(config.page, ItemIdGetOffset(itId));

			PrintPageItemPointer(i, &itId);

			/* show used tuple only */
			if (ItemIdGetFlags(itId) == LP_USED)
				ProcessTuple(tupleHeader, ItemIdGetOffset(itId), ItemIdGetLength(itId));
			else
				continue;
	
		}
	} while ((range = range->next));	
}

/*
 * Show tuple header and tuple data. 
 * Input is tuple header, offset and length of tuple
 */
void
ProcessTuple(HeapTupleHeader tupleHeader, uint32 tupleOffset, uint32 tupleLength)
{
	/* 
	 * Process Tuple Header 
	 */
	if (options.optTupleHeader == 1)
		PrintHeapTupleHeader(tupleHeader);

	/* 
	 * Process Tuple Data 
	 */
	if (options.optTupleData == 1)
		PrintHeapTupleData(tupleHeader, tupleOffset, tupleLength);
}

/*
 * check range and set 'from' and 'to'
 * if one of the 'from' or 'to' is out of range, then use max
 * return values are 'from' and 'to'
 */
void
ProcessCheckRange(int *from, int *to, LinkedList *range, int max)
{
	/* check range values */
	if (range->from < max) {
		*from = range->from;
	}
	else {
		fprintf(stderr, "Item list is out of range: %d, max: %d\n", range->from, max);
		Clean();
		exit(1);
	}

	if (range->to < max) {
		*to = range->to;
	}
	else {
		fprintf(stderr, "Item list is out of range: %d, max: %d\n", range->to, max);
		Clean();
		exit(1);
	}
	/* use all items from 'from' to end */
	if (range->to == 0)
		*to = max;
	
	/* if 'to' was not set, then to = from for cycle stop
	 * it must be after condition which set to = config.numOfPage - 1
	 */
	if (range->to == -1)
		*to = *from;

}

/*
 * In function i will use another page. Hence i must remember reference page
 * and in the end i must return to this page.
 */
int
FollowCtid(void)
{
	HeapTupleHeader	tupleHeader;
	PageHeader		pageHeader;
	TransactionId	xmin = 0;
	TransactionId	xmax = 0;
	ItemPointerData	ctid;
	ItemId 			itId;
	uint32			ctidPos;
	uint32			itemsCount = 0;
	int				numberOfFile = -1;
	int 			numberOfPage = -1;
	int				numberOfItem = -1;
	int				prevNumberOfPage = -1;
	int				ret;


	/* refenrence page and tuple */
	numberOfPage = options.optPage->from;
	numberOfItem = options.optItemList->from;

	while (TRUE) {
		ret = GetPage(numberOfPage);
		if (ret == -1) {
			printf("Page is out of range: %d\n", numberOfPage);
			return -1;
		}
		
		/* check item number and its index */
		pageHeader = GetPageHeader(config.page);
		itemsCount = ItemsCount(pageHeader->pd_lower);
		if (numberOfItem > itemsCount) {
			fprintf(stderr, "Item is out of range: %d\n", itemsCount);
			return -1;
		}

		itId = PageGetItemId(config.page, numberOfItem);
		tupleHeader = GetHeapTupleHeader(config.page, ItemIdGetOffset(itId));

		/***
		 * output 
		 */
		/* print file name if is not same*/
		if (numberOfFile != ret && options.optPath != NULL) {
			numberOfFile = ret;
			if (config.fileSplit == 1)
				PrintFileName(config.fileName, numberOfFile);
		}

		/* printf page info if is another page */
		if (numberOfPage != prevNumberOfPage) {
			prevNumberOfPage = numberOfPage;
			ProcessPageInfo(numberOfPage, itemsCount);
		}
		
		/* item */
		PrintPageItemPointer(numberOfItem, &itId);

		/* show used tuple only */
		if (ItemIdGetFlags(itId) == LP_USED)
			ProcessTuple(tupleHeader, ItemIdGetOffset(itId), ItemIdGetLength(itId));
		else
			break;
		/*** end of output */

		
		/* go to the next tuple */
		xmin = HeapTupleHeaderGetXmin(tupleHeader);
		ctid = tupleHeader->t_ctid;	
		ctidPos = tupleHeader->t_ctid.ip_posid;

		if ((xmin != xmax) && (xmax != 0)) {
			fprintf(stderr, "Xmin is not equal to xmax in previous tuple\n");
			return -1;
		}

		xmax = HeapTupleHeaderGetXmax(tupleHeader);

		if (numberOfItem != ctidPos) {
			numberOfItem = ctidPos;
		
			/* next item's page */
			numberOfPage = ctid.ip_blkid.bi_lo;
		}
		else {
			break;
		}

	}

	return 0;
}

/* open file for reading */
int
OpenFile(char *absName)
{
	int		fr;
	
	fr = open(absName, O_RDONLY, S_IRWXU);
	if (fr == -1) {
		return -1;
	}
	
	return fr;
}

/* ReadPage gets whole page from file into the memory */
int
ReadPage(int fr, uint8 *page, int pageSize, off_t pageOffset)
{
	int		ret;

	ret = lseek(fr, pageOffset, SEEK_SET);
	if (ret == -1) {
		perror("Error while fseek on page");
		return -1;
	}

	ret = read(fr, page, pageSize);
	if (ret == -1) {
		perror("Error while reading page");
		return -1;
	}	

	return ret;
}

/* return size of page from pd_pagesize_version in page header */
uint32
_PageGetPageSize(int fr)
{
	uint32	pageSize;
	int		ret;

	lseek(fr, (off_t)(PAGE_HEADER - PD_PAGESIZE_VERSION), SEEK_SET);
	if (ret == -1) {
		perror("Error while reading page");
		return 0;
	}	

	read(fr, (off_t*) &pageSize, sizeof(unsigned));
	if (ret == -1) {
		perror("Error while reading page");
		return 0;
	}	

	return ( pageSize & 0xFF00 );
}

/*
 * PageGetFreeSpace
 * Returns the size of the free (allocatable) space on a page.
 * 
 * It is Copy&Paste from backend/storage/page/bufpage.c
 */
Size
PageGetFreeSpace(Page page)
{
	int		space;

	/*
	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
	 * pd_upper.
	 */
	space = (int) ((PageHeader) page)->pd_upper -
	(int) ((PageHeader) page)->pd_lower;

	if (space < (int) sizeof(ItemIdData))
                return 0;
	space -= sizeof(ItemIdData);    /* XXX not always appropriate */

	return (Size) space;
}


PageHeader
GetPageHeader(uint8 *page)
{
	PageHeader pHeader;

	pHeader = (PageHeader) page;

	return pHeader;
}

/*
 * f returns pointer to heap tuple data
 */
HeapTuple
GetHeapTuple(uint8 *page, uint16 offset)
{
	HeapTuple tuple;

	tuple = (HeapTuple) (page + offset);
	if (tuple == NULL) {
		perror("Can not read tuple");
		return NULL;
	}

	return tuple;
}

/*
 * f returns pointer to tuple header
 */
HeapTupleHeader
GetHeapTupleHeader(uint8 *page, uint16 offset)
{
	HeapTupleHeader tupleHeader;

	tupleHeader = (HeapTupleHeader) (page + offset);
	if (tupleHeader == NULL) {
		perror("Can not read tuple header");
		return NULL;
	}

	return tupleHeader;
}

/* return number of items */
int 
ItemsCount(int pLower)
{
	return ((pLower -  PAGE_HEADER) / ITEM_POINTER);
}
