锦标赛排序:多路归并

锦标赛排序:多路归并

利用树结构进行排序是很常见的,例如查找树中序遍历就得到有序序列,使用堆结构也可以实现排序。这里的“锦标赛排序”就构造了一种“赢者树”的结构来实现排序。

一棵赢者树分为内部节点和外部节点。外部节点就是原始的数据,内部节点指向外部节点索引(比如外部节点的数组下标),一般为了便于计算,我们将外部节点的数据量扩展为2的整数幂,内部结点的最底一层可以首先初始化为对应的外部节点索引。每个内部节点表示一场“比赛”,节点的值是左右子结点的“赢方”,对于数值型数字,我们可以定义数值更小的那个为胜(当然也可以定义数值更大的胜,或者其它规则,如何规定胜应当按需要而定),为了保证扩展的结点不影响比赛结果,当定义小者为胜时,我们可以将外部结点扩展出的结点是所有外部结点不可能达到的较大数值(例如int类型的最大值0x7FFFFFFF),这样无论怎么比,都不会有扩展出来的结点参与到比赛当中。

由于内部结点保存的是外部节点的索引,所以利用索引对数据比较,最终的胜者将出现在根节点,也就是最小值的下标。而赢者树存在这样一个特点,当决出比赛胜负后,如果参赛者进行了更换,那么只需要将参赛者所在的路径进行重赛,就能保证仍然是赢者树,也就是每次重赛的代价只有\( O(\log N )\) (N是外部结点的数目), 重赛的过程也与堆排序的上滤相似。

首先,我们使用赢者树,对一个含有N个元素的整数数组按升序输出。将我们的数组作为外部结点,计算数组所在的层需要扩展多少个,然后初始化这个赢者树,初始化时,从下向上逐层计算胜者,得到原始数据的最小值,输出它,并将这个索引所对的值改为INF,并对上一局的胜者所参与的所有比赛进行重赛,从而得到第2个胜者。依次类推将比赛进行N次,每次输出最小值,即可实现排序。初始化的时间为\( O(N) \),每次重赛的时间为\( O(\log(N)) \),共进行了N次重赛,所以用赢者树实现的“锦标赛排序”的时间复杂度是\( O(N \log(N)) \)的。

在内部节点的实现时,由于是完全二叉树,可以仿照堆的方式,利用一个一维数组的[1..N]位置而浪费[0]位置,这样方便计算父子结点的下标。

#include <cstdio>
#include <cmath>
const int MAXN=1000;
const int INF = 0x7FFFFFFF;
int parent(int r){return r/2;}
int lChild(int r){return r*2;}
int rChild(int r){return 1+r*2;}
int startLevel;
int Tree[MAXN];
void init(int *a)
{
	int i;
	int id = 0;
	for(i=(int)pow(2,startLevel);
		i<(int)pow(2,startLevel+1);
		i++)
	{
		Tree[i] = id;
		id++;
	}
	for(int k=startLevel-1;k>=0;k--)
	{
		for(i=(int)pow(2,k);
			i<(int)pow(2,k+1);
			i++)
		{
			int cl = lChild(i);
			int cr = rChild(i);
			if(a[Tree[cl]] < a[Tree[cr]])
			{
				Tree[i] = Tree[cl];
			}
			else
			{
				Tree[i] = Tree[cr];
			}
		}
	}
}
void rematch(int i,int *data)
{
	int treeIdx = i+(int)pow(2,startLevel);
	int p = parent(treeIdx);
	while(p>=1)
	{
		int cl = lChild(p);
		int cr = rChild(p);
		if(data[Tree[cl]] < data[Tree[cr]])
		{
			Tree[p] = Tree[cl];
		}
		else
		{
			Tree[p] = Tree[cr];
		}
		p=parent(p);
	}
}
int main(void)
{
	int N;
	int a[MAXN];
	scanf("%d",&N);
	startLevel = (int)(ceil(log(N)*1.0/log(2)));
	int i;
	for(i=0;i<N;i++)
	{
		scanf("%d",a+i);
	}
	int expandSize = (int)pow(2,startLevel);
	while(i<expandSize)
	{
		a[i] = INF;
		i++;
	}
	init(a);
	for(i=0;i<N;i++)
	{
		int winner = Tree[1];
		printf("%d\n",a[winner]);
		a[winner] = INF;
		rematch(winner,a);
	}
	return 0;
}

当然,这样看起来似乎并没有比堆排序更优秀,因为用树来保存索引的空间还是比较大的。不过,锦标赛排序的特点是“更换参赛者可以在\(O(\log N)\)的时间内完成重赛”,每次我们都将当前的胜者比赛之后置为INF,这个结点便不再使用,可能有些浪费,如果某个选手赛完之后,用新的选手来替换他继续比赛。同时还要为了保证有序性,新出场的选手需要比刚才的胜者更“弱”(数值小则越强,所以是更大的数值)。这样的话,我们若把多个有序的表的表头组织一棵赢者树,当决出冠军之后,它所在的那个表就将表头切换到下一个位置,直到某个表为空再置为INF,当所有的表都为空,则完成多个有序表的合并。

这里使用一个5层的赢者树,有32个外部节点,这里设计每个外部结点是有序的vector的首元素,将所有数据轮流输入到这些vector当中,使用赢者树把这些数据归并成一个有序的序列。

#include <cstdio>
#include <vector>
#include <cmath>
#include <algorithm>
const int INF = 0x7FFFFFFF;
const int treeLevel = 5;
const int V_COUNT = (1<<treeLevel);
std::vector<int> v[V_COUNT];
int vectptr[V_COUNT];
int WinnerTree[1+2*V_COUNT];
int parent(int r){return r/2;}
int lChild(int r){return 2*r;}
int rChild(int r){return 2*r+1;}
int getTreeOriginData(int i)
{
	return v[ i ][ vectptr[i] ];
}
void outputTree(int id,int level)
{
	if(lChild(id)<2*V_COUNT)
	{
		outputTree(lChild(id),level+1);
	}
	for(int i=0;i<level;i++)
	{
		printf("---------");
	}
	printf("[%d=(%d)]\n",WinnerTree[id],getTreeOriginData(WinnerTree[id]));
	if(rChild(id)<2*V_COUNT)
	{
		outputTree(rChild(id),level+1);
	}
}
void buildTree()
{
	int i;
	int id = 0;
	for(i=(int)pow(2,treeLevel);
		i<(int)pow(2,treeLevel+1);
		i++)
	{
		WinnerTree[i] = id;
		id++;
	}
	for(int k=treeLevel-1;k>=0;k--)
	{
		for(i=(int)pow(2,k);
			i<(int)pow(2,k+1);
			i++)
		{
			int cl = lChild(i);
			int cr = rChild(i);
			if(getTreeOriginData(WinnerTree[cl]) < getTreeOriginData(WinnerTree[cr]))
			{
				WinnerTree[i] = WinnerTree[cl];
			}
			else
			{
				WinnerTree[i] = WinnerTree[cr];
			}
		}
	}
}
void rematch(int i)
{
	int treeIdx = (int)pow(2,treeLevel)+i;
	int p = parent(treeIdx);
	while(p>=1)
	{
		int cl = lChild(p);
		int cr = rChild(p);
		if(getTreeOriginData(WinnerTree[cl]) < getTreeOriginData(WinnerTree[cr]))
		{
			WinnerTree[p] = WinnerTree[cl];
		}
		else
		{
			WinnerTree[p] = WinnerTree[cr];
		}
		p=parent(p);
	}
}
int main(void)
{
	int N;
	int x;
	int i=0;
	scanf("%d",&N);
	for(i=0;i<N;i++)
	{
		scanf("%d",&x);
		v[i%V_COUNT].push_back(x);
	}
	for(i=0;i<V_COUNT;i++)
	{
		v[i].push_back(INF);
		std::sort(v[i].begin(),v[i].end());
	}
	buildTree();
	int winner;
	for(i=0;i<N;i++)
	{
		if(i>0)
			printf(" ");
		winner = WinnerTree[1];
		printf("%d",getTreeOriginData(winner));
		vectptr[winner]++;
		rematch(winner);
	}
	printf("\n");
	return 0;
}

这可以应用在一个常见的问题:内存空间远远小于数据文件大小,如何对数据文件进行排序,第一步很容易想到,就是将大文件分成小文件,先将小文件排序,使每个小文件是自身有序的,即文件的第一条数据一定是这个文件中最小的,第2条数据是次小的,…。现在再将这些中间文件合并成一个大文件而保证其有序性,如果两两合并,那么又需要再生成多次中间文件,这样并不是好的方案。这里利用赢者树进行合并:将所有文件的第一条数据作为外部结点,当一轮锦标赛决出胜者之后,将赢者所对的这条数据输出到最终文件,然后把赢者所对应的那个文件指针指向下一条数据,再重新进行一轮锦标赛,当某个文件读到EOF时,相对应的外部结点为INF,直到所有文件都为EOF,说明所有的数据已经全部写入最终文件。

例如,在本地已经将in/*.data的小文件排好自身的顺序,要将in/*.data中部分有序的文件合并到out/res.data当中,假设in/*.data中文件号从1到7共7个文件。

#include <cstdio>
#include <cmath>
#include <cstring>
const int INF = 0x7FFFFFFF;
const int FILENUM = 7;
const int MAXN = 1000;
int parent(int r){return r/2;}
int lChild(int r){return r*2;}
int rChild(int r){return 1+r*2;}
int startLevel = (int)(ceil(log(FILENUM)*1.0/log(2)));
void initTree(FILE **fp,int *data,int *tree)
{
	int i;
	int sz = pow(2,startLevel+1);
	for(i=0;i<FILENUM;i++)
	{
		fscanf(fp[i],"%d",data+i);
	}
	while(i<sz)
	{
		data[i] = INF;
		i++;
	}
	int id = 0;
	for(i=pow(2,startLevel);i<pow(2,startLevel+1);i++)
	{
		tree[i] = id;
		id++;
	}
	for(int k=startLevel-1;k>=0;k--)
	{
		for(i=pow(2,k);i<pow(2,k+1);i++)
		{
			int cl = lChild(i);
			int cr = rChild(i);
			if(data[tree[cl]] < data[tree[cr]])
			{
				tree[i] = tree[cl];
			}
			else
			{
				tree[i] = tree[cr];
			}
		}
	}
}
void replay(int i,int *tree,int *data)
{
	int treeIdx = i+(int)pow(2,startLevel);
	int p = parent(treeIdx);
	while(p>=1)
	{
		int cl = lChild(p);
		int cr = rChild(p);
		if(data[tree[cl]] < data[tree[cr]])
		{
			tree[p] = tree[cl];
		}
		else
		{
			tree[p] = tree[cr];
		}
		p=parent(p);
	}
}
void race(FILE **fp,FILE *fout,int *data,int *tree)
{
	int EofNum=0;
	while(EofNum<FILENUM)
	{
		int winner = tree[1];
		fprintf(fout,"%d\n",data[winner]);
		if(fscanf(fp[winner],"%d",data+winner)==EOF)
		{
			EofNum++;
			data[winner] = INF;
		}
		replay(winner,tree,data);
	}
}
int main(void)
{
	int i;
	FILE *(fp[FILENUM]);
	FILE *fout;
	char fileName[50];
	int data[(int)pow(2,startLevel)];
	int tree[MAXN];
	for(i=0;i<FILENUM;i++)
	{
		sprintf(fileName,"in/%d.data",i+1);
		fp[i] = fopen(fileName,"r");
	}
	fout = fopen("out/res.data","w");
	initTree(fp,data,tree);
	race(fp,fout,data,tree);
	for(i=0;i<FILENUM;i++)
	{
		fclose(fp[i]);
	}
	fclose(fout);
	return 0;
}

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注