persistentstorage/dbms/pcdbms/utable/UT_ORDER.CPP
changeset 0 08ec8eefde2f
equal deleted inserted replaced
-1:000000000000 0:08ec8eefde2f
       
     1 // Copyright (c) 1998-2009 Nokia Corporation and/or its subsidiary(-ies).
       
     2 // All rights reserved.
       
     3 // This component and the accompanying materials are made available
       
     4 // under the terms of "Eclipse Public License v1.0"
       
     5 // which accompanies this distribution, and is available
       
     6 // at the URL "http://www.eclipse.org/legal/epl-v10.html".
       
     7 //
       
     8 // Initial Contributors:
       
     9 // Nokia Corporation - initial contribution.
       
    10 //
       
    11 // Contributors:
       
    12 //
       
    13 // Description:
       
    14 // Order-by stage for the cursor data "pipeline"
       
    15 // 
       
    16 //
       
    17 
       
    18 #include "UT_STD.H"
       
    19 #include "D32COMP.H"
       
    20 
       
    21 #ifdef _DEBUG
       
    22 #define _CHECK_ORDERING		// forces a full test of the ordering after sorting
       
    23 #endif
       
    24 
       
    25 #define MAX( a, b ) ( (a) > (b) ? (a) : (b) )
       
    26 
       
    27 /* The key structure.
       
    28 
       
    29 Each key value is always aligned on a 32-bit boundary to allow word reads and writes
       
    30 integral values are always atored as 32-bit values just as for the row buffer
       
    31 Text columns are encoded as follows (trailing padding is omitted from the description)
       
    32 
       
    33   Text8 columns			<byte n><n ASCII characters>
       
    34   n is the [character] length of the column
       
    35 
       
    36   Text16 columns		<short n><n UNICODE characters>
       
    37   n is the [character] length of the column
       
    38 
       
    39   LongText8 columns		<word n><n ASCII characters>
       
    40                  or		<word n|ETrunc><32 ASCII characters><blobId>
       
    41   n is [byte] size of the column
       
    42 
       
    43   LongText16 columns	<word n><n/2 UNICODE characters>
       
    44                   or	<word n|ETrunc><16 UNICODE characters><blobId>
       
    45   n is [byte] size of the column
       
    46 
       
    47 */
       
    48 
       
    49 
       
    50 class CDbOrderByStage::HOrdering
       
    51 	{
       
    52 private:
       
    53 	struct TKeyCol
       
    54 		{
       
    55 		TDbColNo iOrdinal;
       
    56 		TUint8 iType;
       
    57 		TUint8 iOrder;
       
    58 		TUint8 iLength;
       
    59 		};
       
    60 	struct TBlobKey
       
    61 		{
       
    62 		enum {ETrunc=(TInt)0x80000000};
       
    63 		enum {ETruncSize=32};		// store 32 bytes of truncated BLOBs
       
    64 	public:
       
    65 		inline TInt Size() const;
       
    66 	public:
       
    67 		TInt iSize;
       
    68 		union {TUint8 iData8[ETruncSize]; TUint16 iData16[ETruncSize>>1];};
       
    69 		TDbBlobId iId;
       
    70 		};
       
    71 public:
       
    72 	static HOrdering* NewL(CDbTable& aTable,const CDbKey& aKey);
       
    73 	TAny* EntryL(TAny* aPtr,const RDbTableRow& aRow) const;
       
    74 	TInt CompareL(const TAny* aLeft,const TAny* aRight) const;
       
    75 	TInt MaxSize() const;
       
    76 private:
       
    77 	inline HOrdering(TInt aCount,TDbTextComparison aComparison,CDbTable& aTable);
       
    78 	MStreamBuf* BlobLC(TDbBlobId aId,TDbColType aType) const;
       
    79 	TInt CompareLongText8L(const TBlobKey& aLeft,const TBlobKey& aRight) const;
       
    80 	TInt CompareLongText16L(const TBlobKey& aLeft,const TBlobKey& aRight) const;
       
    81 private:
       
    82 	CDbTable& iTable;
       
    83 	const TTextOps& iTextOps;
       
    84 	const TKeyCol* iEndOfKeys;
       
    85 	TKeyCol iKeys[1];	// one or more keys
       
    86 	};
       
    87 
       
    88 // Class HDbOrdering
       
    89 
       
    90 inline TInt CDbOrderByStage::HOrdering::TBlobKey::Size() const
       
    91 	{
       
    92 	TInt size=_FOFF(TBlobKey,iData8[iSize+3]);
       
    93 	return (size&ETrunc) ? sizeof(TBlobKey) : size&~3;
       
    94 	}
       
    95 
       
    96 inline CDbOrderByStage::HOrdering::HOrdering(TInt aCount,TDbTextComparison aComparison,CDbTable& aTable) :
       
    97 	iTable(aTable),
       
    98 	iTextOps(TTextOps::Ops(aComparison)),
       
    99 	iEndOfKeys(&iKeys[aCount])
       
   100 	{
       
   101 	}
       
   102 
       
   103 //
       
   104 // Construct the ordering based on the key definition (must be valid for the table)
       
   105 // and the column set provided
       
   106 //
       
   107 CDbOrderByStage::HOrdering* CDbOrderByStage::HOrdering::NewL(CDbTable& aTable,const CDbKey& aKey)
       
   108 	{
       
   109 	TInt count=aKey.Count();
       
   110 	__ASSERT(count>0);
       
   111 	HOrdering* self=new(User::AllocLC(_FOFF(HOrdering,iKeys[count])))
       
   112 								HOrdering(count,aKey.Comparison(),aTable);
       
   113 	TKeyCol* pKey=&self->iKeys[0];
       
   114 	const HDbColumnSet& columns=aTable.Def().Columns();
       
   115 	for (TInt ii=0;ii<count;++pKey,++ii)
       
   116 		{
       
   117 		const TDbKeyCol& key=aKey[ii];
       
   118 		pKey->iOrder=TUint8(key.iOrder);
       
   119 		pKey->iOrdinal=columns.ColNoL(key.iName);
       
   120 		if (pKey->iOrdinal==KDbNullColNo)
       
   121 			__LEAVE(KErrNotFound);
       
   122 		const TDbColumnDef& col=columns[pKey->iOrdinal];
       
   123 		switch (pKey->iType=col.iType)
       
   124 			{
       
   125 		default:
       
   126 			break;
       
   127 		case EDbColText8:
       
   128 		case EDbColText16:
       
   129 			{
       
   130 			TInt l=col.iMaxLength;
       
   131 			if (l<256)
       
   132 				{
       
   133 				pKey->iLength=TUint8(l);
       
   134 				break;
       
   135 				}
       
   136 			}
       
   137 			// fall through to argument error
       
   138 		case EDbColBinary:
       
   139 		case EDbColLongBinary:
       
   140 			__LEAVE(KErrArgument);
       
   141 			}
       
   142 		}
       
   143 	CleanupStack::Pop();
       
   144 	return self;
       
   145 	}
       
   146 
       
   147 TInt CDbOrderByStage::HOrdering::MaxSize() const
       
   148 	{
       
   149 	TInt maxsize=0;
       
   150 	const TKeyCol* const end=iEndOfKeys;
       
   151 	const TKeyCol* key=&iKeys[0];
       
   152 	__ASSERT(key<end);
       
   153 	do	{
       
   154 		TInt s;
       
   155 		switch (key->iType)
       
   156 			{
       
   157 		default:
       
   158 			s=sizeof(TUint32);
       
   159 			break;
       
   160 		case EDbColInt64:
       
   161 			s=sizeof(TInt64);
       
   162 			break;
       
   163 		case EDbColDateTime:
       
   164 			s=sizeof(TTime);
       
   165 			break;
       
   166 		case EDbColReal64:
       
   167 			s=sizeof(TReal64);
       
   168 			break;
       
   169 		case EDbColText8:
       
   170 			s=Align4(key->iLength+1);
       
   171 			break;
       
   172 		case EDbColText16:
       
   173 			s=Align4((key->iLength<<1)+1);
       
   174 			break;
       
   175 		case EDbColLongText8:
       
   176 		case EDbColLongText16:
       
   177 			s=MAX(__Align8(sizeof(TUint32)+KDbMaxInlineBlobSize),sizeof(TBlobKey));
       
   178 			break;
       
   179 			}
       
   180 		maxsize+=s;
       
   181 		} while (++key<end);
       
   182 	return maxsize;
       
   183 	}
       
   184 
       
   185 MStreamBuf* CDbOrderByStage::HOrdering::BlobLC(TDbBlobId aId,TDbColType aType) const
       
   186 	{
       
   187 	return iTable.BlobsL()->ReadLC(aId,aType);
       
   188 	}
       
   189 
       
   190 //
       
   191 // Construct an entry at aPtr from the record given
       
   192 // iMaxSize bytes are assumed to be available at aPtr
       
   193 // return the actual size of the entry
       
   194 //
       
   195 TAny* CDbOrderByStage::HOrdering::EntryL(TAny* aPtr,const RDbTableRow& aRow) const
       
   196 	{
       
   197 	__ASSERT(Align4(aPtr)==aPtr);
       
   198 	TUint32* ptr=(TUint32*)aPtr;		// 32-bit words are nice
       
   199 	const TKeyCol* const end=iEndOfKeys;
       
   200 	const TKeyCol* key=iKeys;
       
   201 	__ASSERT(key<end);
       
   202 	do	{
       
   203 		const TDbCell* cell=aRow.ColCell(key->iOrdinal);
       
   204 		TDbColType type=TDbColType(key->iType);
       
   205 		if (cell->Length()==0)
       
   206 			{	// null column
       
   207 			const TUint K64BitColumnMask=(1u<<EDbColInt64)|(1u<<EDbColReal64)|(1u<<EDbColDateTime);
       
   208 			*ptr++=0;
       
   209 			if (K64BitColumnMask&(1u<<type))
       
   210 				*ptr++=0;		// 8-byte column
       
   211 			continue;
       
   212 			}
       
   213 		switch (type)
       
   214 			{
       
   215 		default:
       
   216 			__ASSERT(0);
       
   217 		case EDbColBit:
       
   218 		case EDbColUint8:
       
   219 		case EDbColUint16:
       
   220 		case EDbColUint32:
       
   221 		case EDbColInt8:
       
   222 		case EDbColInt16:
       
   223 		case EDbColInt32:
       
   224 		case EDbColReal32:
       
   225 			*ptr++=*(const TUint32*)cell->Data();
       
   226 			break;
       
   227 		case EDbColInt64:
       
   228 		case EDbColDateTime:
       
   229 		case EDbColReal64:
       
   230 			{
       
   231 			const TUint32* data=(const TUint32*)cell->Data();
       
   232 			*ptr++=*data++;
       
   233 			*ptr++=*data;
       
   234 			}
       
   235 			break;
       
   236 		case EDbColText8:
       
   237 			{
       
   238 			TInt size=cell->Length();
       
   239 			*(TUint8*)ptr=TUint8(size);
       
   240 			ptr=(TUint32*)Align4(Mem::Copy(PtrAdd(ptr,1),cell->Data(),size));
       
   241 			}
       
   242 			break;
       
   243 		case EDbColText16:
       
   244 			{
       
   245 			TInt size=cell->Length();
       
   246 			*(TUint16*)ptr=TUint16(size>>1);
       
   247 			ptr=(TUint32*)Align4(Mem::Copy(PtrAdd(ptr,2),cell->Data(),size));
       
   248 			}
       
   249 			break;
       
   250 		case EDbColLongText8:
       
   251 		case EDbColLongText16:
       
   252 			{
       
   253 			const TDbBlob& blob=*(const TDbBlob*)cell->Data();
       
   254 			TInt size=blob.Size();
       
   255 			if (blob.IsInline())
       
   256 				{
       
   257 				*ptr++=size;
       
   258 				ptr=(TUint32*)Align4(Mem::Copy(ptr,blob.Data(),size));
       
   259 				}
       
   260 			else
       
   261 				{
       
   262 				TInt rsize=size;
       
   263 				if (size>TBlobKey::ETruncSize)
       
   264 					{
       
   265 					size|=TBlobKey::ETrunc;
       
   266 					rsize=TBlobKey::ETruncSize;
       
   267 					}
       
   268 				*ptr++=size;
       
   269 				BlobLC(blob.Id(),TDbColType(key->iType))->ReadL(ptr,rsize);
       
   270 				CleanupStack::PopAndDestroy();
       
   271 				ptr=Align4(PtrAdd(ptr,rsize));
       
   272 				if (size&TBlobKey::ETrunc)
       
   273 					*ptr++=blob.Id();
       
   274 				}
       
   275 			}
       
   276 			break;
       
   277 			}
       
   278 		} while (++key<end);
       
   279 	return ptr;
       
   280 	}
       
   281 
       
   282 TInt CDbOrderByStage::HOrdering::CompareLongText8L( const CDbOrderByStage::HOrdering::TBlobKey& aLeft, const CDbOrderByStage::HOrdering::TBlobKey& aRight ) const
       
   283 	{
       
   284 	TUint sizel = aLeft.iSize;
       
   285 	TUint sizer = aRight.iSize;
       
   286 	TUint s = sizel;
       
   287 	if ( sizer < s )
       
   288 		s = sizer;
       
   289 	if ( s > static_cast<TUint>( TBlobKey::ETruncSize ) && ( ( sizel | sizer ) & static_cast<TUint>( TBlobKey::ETrunc ) ) )
       
   290 		s = TBlobKey::ETruncSize;
       
   291 	TInt rr = iTextOps.Compare( aLeft.iData8, s, aRight.iData8, s );
       
   292 	if ( rr )
       
   293 		return rr;
       
   294 //
       
   295 // matches up to the same-length inlined data...
       
   296 // now it gets interesting
       
   297 //
       
   298 	if ( ( ( sizel | sizer ) & static_cast<TUint>( TBlobKey::ETrunc ) ) == 0 )
       
   299 		return sizel - sizer;		// neither are truncated
       
   300 	if ( s == sizel )
       
   301 		return -1;		// left completely match against truncated right
       
   302 	if ( s == sizer )
       
   303 		return 1;		// right completely matched against truncated left
       
   304 
       
   305 	s = Min( TInt( sizel & ~TBlobKey::ETrunc ), TInt( sizer & ~TBlobKey::ETrunc ) );		// minimum real length
       
   306 	if ( sizel & static_cast<TUint>( TBlobKey::ETrunc ) )
       
   307 		{
       
   308 		MStreamBuf& bufL = *BlobLC( aLeft.iId, EDbColLongText8 );
       
   309 		if ( sizer & static_cast<TUint>( TBlobKey::ETrunc ) )
       
   310 			{	// both out-of-line
       
   311 			rr = Comp::Compare8L( bufL, *BlobLC( aRight.iId, EDbColLongText8 ), s, iTextOps );
       
   312 			CleanupStack::PopAndDestroy();
       
   313 			}
       
   314 		else	// left side only out-of-line
       
   315 			rr = Comp::Compare8L( bufL, aRight.iData8, s, iTextOps );
       
   316 		}
       
   317 	else
       
   318 		{	// right side only out-of-line
       
   319 		__ASSERT( sizer & static_cast<TUint>( TBlobKey::ETrunc ) );
       
   320 		rr = -Comp::Compare8L( *BlobLC( aRight.iId, EDbColLongText8 ), aLeft.iData8, s, iTextOps );
       
   321 		}
       
   322 	CleanupStack::PopAndDestroy();
       
   323 	return rr ? rr : ( sizel & ~TBlobKey::ETrunc ) - ( sizer & ~TBlobKey::ETrunc );
       
   324 	}
       
   325 
       
   326 TInt CDbOrderByStage::HOrdering::CompareLongText16L( const CDbOrderByStage::HOrdering::TBlobKey& aLeft, const CDbOrderByStage::HOrdering::TBlobKey& aRight ) const
       
   327 	{
       
   328 	TUint sizeL = aLeft.iSize  & ~TBlobKey::ETrunc; // set truncation bit to 0 to get true size
       
   329 	TUint sizeR = aRight.iSize & ~TBlobKey::ETrunc;
       
   330 	TBool truncL = aLeft.iSize  & TBlobKey::ETrunc; // data is truncated if TBlobKey::ETrunc bit is 1
       
   331 	TBool truncR = aRight.iSize & TBlobKey::ETrunc;
       
   332 	
       
   333 	if (!(truncL | truncR)) // neither side is truncated, compare full strings
       
   334 		{		
       
   335 		return iTextOps.Order( aLeft.iData16, sizeL>>1, aRight.iData16, sizeR>>1 );
       
   336 		}
       
   337 		
       
   338 	TInt result;
       
   339 	TUint sizeMin = Min( TInt(sizeL), TInt(sizeR) ); // minimum real length
       
   340 	if ( truncL )
       
   341 		{
       
   342 		MStreamBuf& bufL = *BlobLC( aLeft.iId, EDbColLongText16 );
       
   343 		if ( truncR )
       
   344 			{	// both out-of-line
       
   345 			result = Comp::Compare16L( bufL, *BlobLC( aRight.iId, EDbColLongText16 ), sizeMin, iTextOps );
       
   346 			CleanupStack::PopAndDestroy();
       
   347 			}
       
   348 		else	// left side only out-of-line
       
   349 			result = Comp::Compare16L( bufL, aRight.iData16, sizeMin, iTextOps );
       
   350 		}
       
   351 	else
       
   352 		{	// right side only out-of-line
       
   353 		__ASSERT( truncR );
       
   354 		result = -Comp::Compare16L( *BlobLC( aRight.iId, EDbColLongText16 ), aLeft.iData16, sizeMin, iTextOps );
       
   355 		}
       
   356 	CleanupStack::PopAndDestroy();
       
   357 	return result ? result : ( sizeL ) - ( sizeR );
       
   358 	}
       
   359 
       
   360 TInt CDbOrderByStage::HOrdering::CompareL(const TAny* aLeft,const TAny* aRight) const
       
   361 //
       
   362 // compare the keys
       
   363 //
       
   364 	{
       
   365 	const TUint8* left=(const TUint8*)aLeft;
       
   366 	const TUint8* right=(const TUint8*)aRight;
       
   367 	const TKeyCol* end=iEndOfKeys;
       
   368 	const TKeyCol* key=&iKeys[0];
       
   369 	TInt rr;
       
   370 	for (;;)
       
   371 		{
       
   372 		switch (key->iType)
       
   373 			{
       
   374 		default:
       
   375 			__ASSERT(0);
       
   376 		case EDbColBit:
       
   377 		case EDbColUint8:
       
   378 		case EDbColUint16:
       
   379 		case EDbColUint32:
       
   380 			rr=Comp::Compare(*(const TUint32*)left,*(const TUint32*)right);
       
   381 			left+=sizeof(TUint32);
       
   382 			right+=sizeof(TUint32);
       
   383 			break;
       
   384 		case EDbColInt8:
       
   385 		case EDbColInt16:
       
   386 		case EDbColInt32:
       
   387 			rr=Comp::Compare(*(const TInt32*)left,*(const TInt32*)right);
       
   388 			left+=sizeof(TInt32);
       
   389 			right+=sizeof(TInt32);
       
   390 			break;
       
   391 		case EDbColInt64:
       
   392 			rr=Comp::Compare(*(const TInt64*)left,*(const TInt64*)right);
       
   393 			left+=sizeof(TInt64);
       
   394 			right+=sizeof(TInt64);
       
   395 			break;
       
   396 		case EDbColDateTime:
       
   397 			rr=Comp::Compare(*(const TTime*)left,*(const TTime*)right);
       
   398 			left+=sizeof(TTime);
       
   399 			right+=sizeof(TTime);
       
   400 			break;
       
   401 		case EDbColReal32:
       
   402 			rr=Comp::Compare(*(const TReal32*)left,*(const TReal32*)right);
       
   403 			left+=sizeof(TReal32);
       
   404 			right+=sizeof(TReal32);
       
   405 			break;
       
   406 		case EDbColReal64:
       
   407 			rr=Comp::Compare(*(const TReal64*)left,*(const TReal64*)right);
       
   408 			left+=sizeof(TReal64);
       
   409 			right+=sizeof(TReal64);
       
   410 			break;
       
   411 		case EDbColText8:
       
   412 			{
       
   413 			TInt sizel=*left++;
       
   414 			TInt sizer=*right++;
       
   415 			rr=iTextOps.Compare(left,sizel,right,sizer);
       
   416 			left=Align4(left+sizel);
       
   417 			right=Align4(right+sizer);
       
   418 			}
       
   419 			break;
       
   420 		case EDbColText16:
       
   421 			{
       
   422 			const TUint16* l16=reinterpret_cast<const TUint16*>(left);
       
   423 			const TUint16* r16=reinterpret_cast<const TUint16*>(right);
       
   424 			TInt sizel=*l16++;
       
   425 			TInt sizer=*r16++;
       
   426 			rr=iTextOps.Order(l16,sizel,r16,sizer);
       
   427 			left=Align4(reinterpret_cast<const TUint8*>(l16+sizel));
       
   428 			right=Align4(reinterpret_cast<const TUint8*>(r16+sizer));
       
   429 			}
       
   430 			break;
       
   431 		case EDbColLongText8:
       
   432 			{
       
   433 			const TBlobKey* ltext=(const TBlobKey*)left;
       
   434 			const TBlobKey* rtext=(const TBlobKey*)right;
       
   435 			rr=CompareLongText8L(*ltext,*rtext);
       
   436 			left+=ltext->Size();
       
   437 			right+=rtext->Size();
       
   438 			}
       
   439 			break;
       
   440 		case EDbColLongText16:
       
   441 			{
       
   442 			const TBlobKey* ltext=(const TBlobKey*)left;
       
   443 			const TBlobKey* rtext=(const TBlobKey*)right;
       
   444 			rr=CompareLongText16L(*ltext,*rtext);
       
   445 			left+=ltext->Size();
       
   446 			right+=rtext->Size();
       
   447 			}
       
   448 			break;
       
   449 			}
       
   450 		if (rr!=0)
       
   451 			break;
       
   452 		if (++key==end)
       
   453 			break;
       
   454 		}
       
   455 	return key->iOrder==TDbKeyCol::EAsc ? rr : -rr;
       
   456 	}
       
   457 
       
   458 //
       
   459 
       
   460 NONSHARABLE_CLASS(CDbOrderByStage::CKeys) : public CBase
       
   461 	{
       
   462 public:		// should be private but VC++ 4.0 whinges
       
   463 	struct TKey
       
   464 		{
       
   465 		TUint32 iId;
       
   466 		TUint8 iKey[4];		// and then some
       
   467 		};
       
   468 private:
       
   469 	enum {EKeysGranularity=32};
       
   470 	struct TPage
       
   471 		{
       
   472 		TPage* iNext;
       
   473 		TKey iEntries[1];
       
   474 		};
       
   475 	enum {EMinPageSize=0x200,EMinKeyElements=2};
       
   476 public:
       
   477 	CKeys(TInt aMaxKeySize);
       
   478 	~CKeys();
       
   479 //
       
   480 	void AddL(TDbRecordId aRecordId,const RDbTableRow& aRow,const HOrdering& aOrder);
       
   481 	void SortL(const HOrdering& aOrder);
       
   482 	void GetRecordsL(CArrayFix<TDbRecordId>& aRecords);
       
   483 #ifdef _CHECK_ORDERING
       
   484 	void VerifyOrderingL(const HOrdering& aOrder);
       
   485 #endif
       
   486 private:
       
   487 	void SortL(TKey* aData[],TInt aElem,const HOrdering& aOrder);
       
   488 	TKey& NewKeyL();
       
   489 	void KeyComplete(TAny* aEnd);
       
   490 	void Release();
       
   491 private:
       
   492 	RPointerArray<TKey> iKeys;
       
   493 	TInt iMaxKeySize;
       
   494 	TInt iPageSize;
       
   495 	TPage* iPages;
       
   496 	TKey* iNext;
       
   497 	TKey* iEnd;
       
   498 	};
       
   499 
       
   500 // Class CDbOrderByStage::CKeys
       
   501 
       
   502 CDbOrderByStage::CKeys::CKeys(TInt aMaxKeySize)
       
   503 	:iKeys(EKeysGranularity),iMaxKeySize(_FOFF(TKey,iKey[aMaxKeySize])),
       
   504 	 iPageSize(Max(EMinPageSize+iMaxKeySize,iMaxKeySize*EMinKeyElements))
       
   505 	{}
       
   506 
       
   507 CDbOrderByStage::CKeys::~CKeys()
       
   508 	{
       
   509 	iKeys.Close();
       
   510 	Release();
       
   511 	}
       
   512 
       
   513 void CDbOrderByStage::CKeys::SortL(CDbOrderByStage::CKeys::TKey* aData[],TInt aElem,const HOrdering& aOrder)
       
   514 //
       
   515 // Sort the array of row keys
       
   516 // Uses Heap-sort
       
   517 //
       
   518 	{
       
   519 	__ASSERT(aElem>1);
       
   520 	TInt heap=aElem>>1;
       
   521 	--aElem;
       
   522 	for (;;)
       
   523 		{
       
   524 		TKey* key;
       
   525 		if (heap!=0)
       
   526 			key=aData[--heap];
       
   527 		else
       
   528 			{
       
   529 			key=aData[aElem];
       
   530 			aData[aElem]=aData[0];
       
   531 			if (--aElem==0)
       
   532 				{
       
   533 				aData[0]=key;
       
   534 				break;
       
   535 				}
       
   536 			}
       
   537 		TInt ix=heap;
       
   538 		TInt parent;
       
   539 		for (parent=ix;;parent=ix)
       
   540 			{
       
   541 			ix=(ix<<1)+1;
       
   542 			if (ix<=aElem)
       
   543 				{
       
   544 				if (ix<aElem && aOrder.CompareL(aData[ix]->iKey,aData[ix+1]->iKey)<0)
       
   545 					++ix;
       
   546 				}
       
   547 			else
       
   548 				break;
       
   549 			if (aOrder.CompareL(aData[ix]->iKey,key->iKey)<=0)
       
   550 				break;
       
   551 			aData[parent]=aData[ix];
       
   552 			}
       
   553 		aData[parent]=key;
       
   554 		}
       
   555 	}
       
   556 
       
   557 void CDbOrderByStage::CKeys::AddL(TDbRecordId aRecordId,const RDbTableRow& aRow,const HOrdering& aOrder)
       
   558 //
       
   559 // Create a key for the row and store it
       
   560 //
       
   561 	{
       
   562 	TKey& key=NewKeyL();
       
   563 	key.iId=aRecordId.Value();
       
   564 	TAny* end=aOrder.EntryL(&key.iKey[0],aRow);
       
   565 	__LEAVE_IF_ERROR(iKeys.Append(&key));
       
   566 	KeyComplete(end);
       
   567 	}
       
   568 
       
   569 void CDbOrderByStage::CKeys::SortL(const HOrdering& aOrder)
       
   570 	{
       
   571 	TInt elem=iKeys.Count();
       
   572 	if (elem>1)
       
   573 		SortL(&iKeys[0],elem,aOrder);
       
   574 	}
       
   575 
       
   576 void CDbOrderByStage::CKeys::GetRecordsL(CArrayFix<TDbRecordId>& aRecords)
       
   577 	{
       
   578 	TInt elem=iKeys.Count();
       
   579 	if (elem==0)
       
   580 		return;
       
   581 	TKey** const base=&iKeys[0];
       
   582 	TKey** ptr=base+elem;
       
   583 	do	{
       
   584 		--ptr;
       
   585 		*REINTERPRET_CAST(TDbRecordId*,ptr)=(*ptr)->iId;
       
   586 		} while (ptr>base);
       
   587 	Release();		// discard key memory before adding records
       
   588 	aRecords.InsertL(0,REINTERPRET_CAST(const TDbRecordId*,base),elem);
       
   589 	iKeys.Reset();
       
   590 	}
       
   591 
       
   592 void CDbOrderByStage::CKeys::Release()
       
   593 //
       
   594 // Discard all of the allocated pages
       
   595 //
       
   596 	{
       
   597 	TPage* p=iPages;
       
   598 	while (p)
       
   599 		{
       
   600 		TPage* n=p->iNext;
       
   601 		User::Free(p);
       
   602 		p=n;
       
   603 		}
       
   604 	iPages=0;
       
   605 	iNext=iEnd=0;
       
   606 	}
       
   607 
       
   608 CDbOrderByStage::CKeys::TKey& CDbOrderByStage::CKeys::NewKeyL()
       
   609 //
       
   610 // Allocate a key entry for the raw key data
       
   611 //
       
   612 	{
       
   613 	TKey* p=iNext;
       
   614 	if (PtrAdd(p,iMaxKeySize)>iEnd)
       
   615 		{	// not enough space for a maximum key
       
   616 		TPage* page=iPages;
       
   617 		if (page)
       
   618 			{	// compress the current page
       
   619 			__DEBUG(page=(TPage*))User::ReAlloc(page,(TUint8*)iNext-(TUint8*)page);
       
   620 			__ASSERT(page==iPages);		// if it moves we are dead
       
   621 			}
       
   622 		page=(TPage*)User::AllocL(_FOFF(TPage,iEntries)+iPageSize);
       
   623 		page->iNext=iPages;
       
   624 		iPages=page;
       
   625 		p=iNext=&page->iEntries[0];
       
   626 		iEnd=PtrAdd(p,iPageSize);
       
   627 		}
       
   628 	return *p;
       
   629 	}
       
   630 
       
   631 void CDbOrderByStage::CKeys::KeyComplete(TAny* aEnd)
       
   632 //
       
   633 // Key is complete, prepare for the next one
       
   634 //
       
   635 	{
       
   636 	__ASSERT(aEnd==Align4(aEnd));
       
   637 	__ASSERT(iNext<=aEnd&&aEnd<=iEnd);
       
   638 	iNext=STATIC_CAST(TKey*,aEnd);
       
   639 	}
       
   640 
       
   641 #ifdef _CHECK_ORDERING
       
   642 void CDbOrderByStage::CKeys::VerifyOrderingL(const HOrdering& aOrder)
       
   643 	{
       
   644 // this performs a full O(N*N) comparison of the record set
       
   645 	if (iKeys.Count()==0)
       
   646 		return;
       
   647 	TKey* const* data=&iKeys[0];
       
   648 	for (TInt ii=iKeys.Count();--ii>=0;)
       
   649 		{
       
   650 		for (TInt jj=iKeys.Count();--jj>=0;)
       
   651 			{
       
   652 			TInt rr=aOrder.CompareL(data[ii]->iKey,data[jj]->iKey);
       
   653 			if (ii==jj)
       
   654 				__ASSERT_ALWAYS(rr==0,User::Invariant());
       
   655 			else if (ii<jj)
       
   656 				__ASSERT_ALWAYS(rr<=0,User::Invariant());
       
   657 			else
       
   658 				__ASSERT_ALWAYS(rr>=0,User::Invariant());
       
   659 			}
       
   660 		}
       
   661 	}
       
   662 #endif
       
   663 
       
   664 // Class CDbOrderByStage
       
   665 
       
   666 inline const RDbTableRow& CDbOrderByStage::Row()
       
   667 	{return iRow;}
       
   668 inline CDbOrderByStage::CKeys& CDbOrderByStage::Keys()
       
   669 	{__ASSERT(iKeys);return *iKeys;}
       
   670 inline const CDbOrderByStage::HOrdering& CDbOrderByStage::Order()
       
   671 	{__ASSERT(iOrder);return *iOrder;}
       
   672 
       
   673 CDbOrderByStage::CDbOrderByStage(const RDbTableRow& aRow)
       
   674 	: CDbBasicWindowStage(KDbUnlimitedWindow),iRow(aRow)
       
   675 	{
       
   676 	__ASSERT(iState==EReset);
       
   677 	}
       
   678 
       
   679 CDbOrderByStage::~CDbOrderByStage()
       
   680 	{
       
   681 	delete iOrder;
       
   682 	delete iKeys;
       
   683 	}
       
   684 
       
   685 void CDbOrderByStage::ConstructL(const CDbKey& aOrderBy)
       
   686 //
       
   687 // Build the key structures to support the partial and complete ordering
       
   688 //
       
   689 	{
       
   690 	iOrder=HOrdering::NewL(Row().Table(),aOrderBy);
       
   691 	}
       
   692 
       
   693 void CDbOrderByStage::Reset()
       
   694 //
       
   695 // Reset the window to initial state
       
   696 //
       
   697 	{
       
   698 	delete iKeys;
       
   699 	iKeys=0;
       
   700 	iState=EReset;
       
   701 	CDbBasicWindowStage::Reset();
       
   702 	}
       
   703 
       
   704 TBool CDbOrderByStage::ReadL(TInt& aWork,TDbPosition aNext)
       
   705 //
       
   706 // Read some more record keys
       
   707 //
       
   708 	{
       
   709 	TDbRecordId id(KDbNullRecordIdValue);
       
   710 	TGoto r;
       
   711 	while ((r=CDbDataStage::GotoL(aWork,aNext,id))==ESuccess)
       
   712 		{
       
   713 		CDbDataStage::ReadRowL(id);
       
   714 		Keys().AddL(id,Row(),Order());
       
   715 		aNext=EDbNext;
       
   716 		}
       
   717 	switch (r)
       
   718 		{
       
   719 	default:
       
   720 		__ASSERT(0);
       
   721 	case ESynchFailure:
       
   722 		__LEAVE(KErrNotReady);
       
   723 	case EExhausted:
       
   724 		return ETrue;
       
   725 	case ENoRow:
       
   726 		return EFalse;
       
   727 		}
       
   728 	}
       
   729 
       
   730 TBool CDbOrderByStage::DoEvaluateL(TInt& aWork)
       
   731 //
       
   732 // Build the key collection, and finally sort it
       
   733 //
       
   734 	{
       
   735 	TState s=iState;
       
   736 	iState=EFailed;
       
   737 	switch (s)
       
   738 		{
       
   739 	default:
       
   740 		__ASSERT(0);
       
   741 	case EFailed:
       
   742 		__LEAVE(KErrNotReady);
       
   743 	case EReset:
       
   744 		__ASSERT(!iKeys);
       
   745 		iKeys=new(ELeave) CKeys(Order().MaxSize());
       
   746 		// drop through to EEvaluate
       
   747 	case EEvaluate:
       
   748 		if (ReadL(aWork,s==EReset ? EDbFirst : EDbNext))
       
   749 			{
       
   750 			iState=EEvaluate;
       
   751 			return ETrue;
       
   752 			}
       
   753 		// drop through to ESort
       
   754 	case ESort:
       
   755 		Keys().SortL(Order());
       
   756 #ifdef _CHECK_ORDERING
       
   757 		Keys().VerifyOrderingL(Order());
       
   758 #endif
       
   759 		// drop through to EAdd
       
   760 	case EAdd:
       
   761 		Keys().GetRecordsL(iRecords);
       
   762 		delete iKeys;
       
   763 		iKeys=0;
       
   764 		// drop through to EComplete
       
   765 	case EComplete:
       
   766 		iState=EComplete;
       
   767 		return EFalse;
       
   768 		}
       
   769 	}
       
   770 
       
   771 TBool CDbOrderByStage::Unevaluated()
       
   772 //
       
   773 // Return whether it is worth Evaluating
       
   774 //
       
   775 	{
       
   776 	if (iState==EComplete)
       
   777 		return CDbDataStage::Unevaluated();
       
   778 	return iState!=EFailed;
       
   779 	}
       
   780 
       
   781 
       
   782 // Class CDbReorderWindowStage
       
   783 
       
   784 CDbReorderWindowStage::CDbReorderWindowStage()
       
   785 	: CDbBasicWindowStage(KDbUnlimitedWindow),iRows(EGranularity)
       
   786 	{
       
   787 	__ASSERT(iState==EReset);
       
   788 	}
       
   789 
       
   790 CDbReorderWindowStage::~CDbReorderWindowStage()
       
   791 	{
       
   792 	iRows.Close();
       
   793 	}
       
   794 
       
   795 void CDbReorderWindowStage::Reset()
       
   796 //
       
   797 // Reset the window to initial state
       
   798 //
       
   799 	{
       
   800 	iRows.Reset();
       
   801 	iState=EReset;
       
   802 	CDbBasicWindowStage::Reset();
       
   803 	}
       
   804 
       
   805 TBool CDbReorderWindowStage::ReadL(TInt& aWork,TDbPosition aNext)
       
   806 //
       
   807 // Read some more row ids
       
   808 //
       
   809 	{
       
   810 	TDbRecordId id(KDbNullRecordIdValue);
       
   811 	TGoto r;
       
   812 	while ((r=CDbDataStage::GotoL(aWork,aNext,id))==ESuccess)
       
   813 		{
       
   814 		__LEAVE_IF_ERROR(iRows.Append(id.Value()));
       
   815 		aNext=EDbNext;
       
   816 		}
       
   817 	switch (r)
       
   818 		{
       
   819 	default:
       
   820 		__ASSERT(0);
       
   821 	case ESynchFailure:
       
   822 		__LEAVE(KErrNotReady);
       
   823 	case EExhausted:
       
   824 		return ETrue;
       
   825 	case ENoRow:
       
   826 		return EFalse;
       
   827 		}
       
   828 	}
       
   829 
       
   830 TBool CDbReorderWindowStage::DoEvaluateL(TInt& aWork)
       
   831 //
       
   832 // Build the key collection, and finally sort it
       
   833 //
       
   834 	{
       
   835 	TState s=iState;
       
   836 	iState=EFailed;
       
   837 	switch (s)
       
   838 		{
       
   839 	default:
       
   840 		__ASSERT(0);
       
   841 	case EFailed:
       
   842 		__LEAVE(KErrNotReady);
       
   843 	case EReset:
       
   844 	case EEvaluate:
       
   845 		if (ReadL(aWork,s==EReset ? EDbFirst : EDbNext))
       
   846 			{
       
   847 			iState=EEvaluate;
       
   848 			return ETrue;
       
   849 			}
       
   850 		if (iRows.Count())
       
   851 			{
       
   852 			iRows.Sort();
       
   853 			iRecords.AppendL((const TDbRecordId*)&iRows[0],iRows.Count());
       
   854 			iRows.Reset();
       
   855 			}
       
   856 	case EComplete:
       
   857 		iState=EComplete;
       
   858 		return EFalse;
       
   859 		}
       
   860 	}
       
   861 
       
   862 TBool CDbReorderWindowStage::Unevaluated()
       
   863 //
       
   864 // Return whether it is worth Evaluating
       
   865 //
       
   866 	{
       
   867 	if (iState==EComplete)
       
   868 		return CDbDataStage::Unevaluated();
       
   869 	return iState!=EFailed;
       
   870 	}
       
   871