realtimenetprots/rtp/cfrtp/src/rtpflow.cpp
author hgs
Thu, 21 Oct 2010 15:53:21 +0530
changeset 55 36ea1f90a3d8
parent 0 307788aac0a8
permissions -rw-r--r--
201041_01

// Copyright (c) 2008-2009 Nokia Corporation and/or its subsidiary(-ies).
// All rights reserved.
// This component and the accompanying materials are made available
// under the terms of "Eclipse Public License v1.0"
// which accompanies this distribution, and is available
// at the URL "http://www.eclipse.org/legal/epl-v10.html".
//
// Initial Contributors:
// Nokia Corporation - initial contribution.
//
// Contributors:
//
// Description:
// RTPFlow Implementation
// 
//

/**
 @file
 @internalComponent
*/

#include <comms-infras/ss_log.h>

#include "rtpflow.h"
#include "rtppint.h"
#include "rtp_cfmessages.h"


#if defined(ESOCK_LOGGING_ACTIVE)
_LIT8(KRtpFlowSubTag, "RtpFlow");
#endif

									
CRtpFlow::CRtpFlow(ESock::CSubConnectionFlowFactoryBase& aFactory, const Messages::TNodeId& aSubConnId, ESock::CProtocolIntfBase* aProtocolIntf)
:CRtpBaseFlow(aFactory, aSubConnId, aProtocolIntf)
{
	LOG_NODE_CREATE(KESockFlowTag, CRtpFlow);
}

CRtpFlow* CRtpFlow::NewL(CSubConnectionFlowFactoryBase& aFactory, const Messages::TNodeId& aSubConn, CProtocolIntfBase* aProtocolIntf)
	{
	CRtpFlow *pRtpFlow = new (ELeave) CRtpFlow(aFactory, aSubConn, aProtocolIntf);
	
	CleanupStack::PushL(pRtpFlow);
	pRtpFlow->ConstructL();
	/* Register the flow with Rtp Pint */
	((CRtpProtocolIntf*)aProtocolIntf)->RegisterSubConnProvIDL(pRtpFlow,aSubConn);
	CleanupStack::Pop();

	return pRtpFlow;
	}

void CRtpFlow::ConstructL()
	{
	TSockAddr addr;
	/* Create the RTP Handler */
	iRtpHandler = CCFRtpHandler::NewL(this,EPriorityNormal);
	iRtpSession = CRtpController::NewL(*iRtpHandler,EPriorityNormal,addr, _L8(""));
	iRtpBuffer.ReAllocL(KRtpDefBufSz);
	}


CRtpFlow::~CRtpFlow()
	{
	delete iRtpSession;
	iRtpBuffer.Close();
	delete iRtpHandler;
	LOG_NODE_DESTROY(KESockFlowTag, CRtpFlow);
	}
	
void CRtpFlow::CanSend()
	{
	__ASSERT_DEBUG(iSessionDataNotify,User::Panic(KRtpFlowPanic, RtpFlowPanics::KPanicNoControlProvider));
	CRtpBaseFlow::CanSend();
	}

TInt CRtpFlow::Write(RMBufChain& aData, TUint aOptions, TSockAddr* anAddr )
	{

	TInt nwr = -1;
	TInt ret = KErrNone;
	/* Findout the amount of Data contained in the RMbuf Chain */
	TInt rmbuflen = aData.Length();
	TPtr8 rtpBufPtr(NULL, 0);
	
	LOG( ESockLogExternal::Printf(KESockFlowTag,KRtpFlowSubTag,_L8("CRtpFlow %08x:\tWrite() Len=%d"), this, rmbuflen));
	
	if(!(aData.First()->Next()))
		{
		RMBuf *buffer = aData.First();
		__ASSERT_DEBUG(rmbuflen == buffer->Length() ,User::Panic(KRtpFlowPanic, RtpFlowPanics::KPanicIncompleteBuffer));
		rtpBufPtr.Set(buffer->Ptr(), rmbuflen, rmbuflen);
		}
	else
		{
		if(iRtpBuffer.MaxLength() < rmbuflen)
			{
			iRtpBuffer.SetLength(0);
			ret = iRtpBuffer.ReAlloc(rmbuflen);
		
			if(KErrNone != ret)
				{
				LOG( ESockLogExternal::Printf(KESockFlowTag,KRtpFlowSubTag,_L8("CRtpFlow %08x:\tWrite() -ALLOC Failed!"), this));
				/* In case of error a protocol is supposed to Error and then return 0 */
				iSessionControlNotify->Error(ret,MSessionControlNotify::EErrorSend);
				return 0;
				}
			}

		/* Copy the Data from RMbuf to Descriptor. Then Pass it to the RtpStack*/
		iRtpBuffer.SetLength(rmbuflen);
		aData.CopyOut(iRtpBuffer);
		rtpBufPtr.Set(&iRtpBuffer[0], rmbuflen, iRtpBuffer.MaxLength());
		}
	
	if(!NonRtpSendData())
		{
		iRtpSession->PacketBeingSent(rtpBufPtr, ERtp); 
		}
	//It is assumed that a Properly Created packet is passed down here.
	//Maybe we should handle a return code for PacketbeingSent function.
	if(aData.First()->Next())
		{
		aData.CopyIn(iRtpBuffer);
		}
		 
	__ASSERT_DEBUG(iSSPData,User::Panic(KRtpFlowPanic, RtpFlowPanics::KPanicNoServiceProvider));
	nwr = iSSPData->Write(aData, aOptions, anAddr);

	LOG( ESockLogExternal::Printf(KESockFlowTag,KRtpFlowSubTag,_L8("CRtpFlow %08x:\tWrite() Done nwr=%d"), this,nwr));	

	return nwr;
	
	}
 
 
void CRtpFlow::ReceivedL(const Messages::TRuntimeCtxId& aSender, const Messages::TNodeId& aRecipient, Messages::TSignatureBase& aMessage)
    {
	CRtpBaseFlow::ReceivedL(aSender,aRecipient,aMessage);

    LOG( ESockLogExternal::Printf(KESockFlowTag,KRtpFlowSubTag,_L8("CRtpFlow %08x:\tReceivedL() Msg %d"), this, aMessage.MessageId().MessageId()));

	if ( aMessage.IsMessage<Messages::TEBase::TError>() )
		{
        LOG( ESockLogExternal::Printf(KESockFlowTag,KRtpFlowSubTag,_L8("CRtpFlow %08x:\tGot TError from Sunconn"), this));
        HandleError(static_cast<Messages::TEBase::TError&>(aMessage).iValue);
		}	
    else if (Messages::TEChild::ERealmId == aMessage.MessageId().Realm())
        {
    	switch (aMessage.MessageId().MessageId())
        	{
   		case Messages::TEChild::TDestroy::EId :
    			{
            	Destroy();
            	break;
    			}
        	}
		}
	else if (TCFDataClient::ERealmId == aMessage.MessageId().Realm())
		{
	    switch (aMessage.MessageId().MessageId())
        	{
        	case TCFDataClient::TProvisionConfig::EId:
         		{	
    			TCFDataClient::TProvisionConfig& aMess = Messages::message_cast<TCFDataClient::TProvisionConfig>(aMessage);
				
				iAccessPointConfig.Close();
				iAccessPointConfig.Open(aMess.iConfig);
				
				const CRtpProvisionConfig& rtpProv = static_cast<const CRtpProvisionConfig&>(AccessPointConfig().FindExtensionL(STypeId::CreateSTypeId(KRtpProvisionUid, KRtpProvisionConfigType)));
				DoProvisionL(&rtpProv);
			
				/* Deafult Processing */
				HandleProvisionMessage(aMessage);				
        		break;
         		}
 
	        case TCFDataClient::TStart::EId :
        		{
	        	/* Check if we Have a Bearer. If we donot Have
	        	   complete when we Have one */
        		User::LeaveIfError(iStartRequest.Open(iSubConnectionProvider,aSender));
	        	if(iSSPData)
	        		{
	        		CompleteStart(KErrNone);
	        		}
	        	else
	        		{
	        		LOG( ESockLogExternal::Printf(KESockFlowTag,KRtpFlowSubTag,_L8("CRtpFlow %08x:\tGot TDataClientStart no SP iIsStarting=>ETrue"), this));
	        		iIsStarting = ETrue;		
	        		}
	        	}
	            break;

	         case TCFDataClient::TStop::EId :
	         	{
	        	StopFlow(static_cast<TCFDataClient::TStop&>(aMessage));
	            break;
	         	}
	         
	         case TCFDataClient::TBindTo::EId :
				{
				TCFDataClient::TBindTo& bindToMsg(static_cast<TCFDataClient::TBindTo&>(aMessage));
                TRAPD(err,BindToL(bindToMsg));
                
                if(err)
                    {
                    Messages::RClientInterface::OpenPostMessageClose(Id(), aSender, Messages::TEBase::TError(TCFDataClient::TBindTo::Id(), err).CRef());
                    }
                else
                    {
                    Messages::RClientInterface::OpenPostMessageClose(Id(), aSender, TCFDataClient::TBindToComplete().CRef());
                    }
                
				//If we have received TDataClientStart before (when we did not yet have a bearer),
				//we complete the start here as well
				if (iIsStarting)
					{
					CompleteStart(err);
					}
				
				HandleBindToMessage(aMessage,err);	
                break;			
				}
 
        	}
		}
	else if (TCFControlProvider::ERealmId == aMessage.MessageId().Realm())
		{
		/* VRAMY : LOG Here!!! */
		}
	}


void CRtpFlow::DoProvisionL(const CRtpProvisionConfig *apRtpProv)    
	{
	iRtpSession->SetSessionBandwidth(apRtpProv->BandWidth());
	iRtpSession->SetRTPTimeConversion(apRtpProv->RtpTimeNow(),apRtpProv->RtptimeConversion());
	iRtpSession->SetRtpStreamParameters(apRtpProv->MaxDropOut(),
										apRtpProv->MaxMisorder(),
										apRtpProv->MinSequential());
	}

TInt CRtpFlow::GetData(RMBufChain& aData, TUint aLength, TUint aOptions, TSockAddr* anAddr)
	{
	ASSERT(iSSPData);
	TInt ret;

	LOG( ESockLogExternal::Printf(KESockFlowTag,KRtpFlowSubTag,_L8("CRtpFlow %08x:\tGetData "), this ));

	ret = iSSPData->GetData(aData, aLength, aOptions, anAddr);
	if(ret < 0)
		{
		/* The Read failed. Just pass the error up */
		LOG( ESockLogExternal::Printf(KESockFlowTag,KRtpFlowSubTag,_L8("CRtpFlow %08x:\tGetData failed (ret =%d)"), this));
		return ret;
		}
	
	/* Findout the amount of Data contained in the RMbuf Chain */
	TInt rmbuflen = aData.Length();
	TPtr8 rtpBufPtr(NULL, 0);
	
	if(!(aData.First()->Next()))
		{
		RMBuf *buffer = aData.First();
		__ASSERT_DEBUG(rmbuflen == buffer->Length() ,User::Panic(KRtpFlowPanic, RtpFlowPanics::KPanicIncompleteBuffer));
		rtpBufPtr.Set(buffer->Ptr(), rmbuflen, rmbuflen);
		}
	else
		{
		if(iRtpBuffer.MaxLength() < rmbuflen)
			{
			iRtpBuffer.SetLength(0);
			ret = iRtpBuffer.ReAlloc(rmbuflen);
			if(KErrNone != ret)
				{
				return ret;
				}	
			}
	
		iRtpBuffer.SetLength(rmbuflen);
		aData.CopyOut(iRtpBuffer);
		rtpBufPtr.Set(&iRtpBuffer[0], rmbuflen, iRtpBuffer.MaxLength());
		}
	
	if(anAddr)
		{
		iRtpHandler->SetRecvAddress(*anAddr);
		}
	else
		{
		/* Connected Socket */
		__ASSERT_DEBUG(iConnected,User::Panic(KRtpFlowPanic, RtpFlowPanics::KPanicIncorrectState));
		iRtpHandler->SetRecvAddress(iDestAddr);
		}
		
	TRAP(ret,iRtpSession->DoReceiveL(rtpBufPtr,ERtp));
	
	if((KErrNone == ret) && (aData.First()->Next()))
		{
		aData.CopyIn(iRtpBuffer);
		}

	LOG( ESockLogExternal::Printf(KESockFlowTag,KRtpFlowSubTag,_L8("CRtpFlow %08x:\tGetData()) done ret=%d"), this,ret));

	return ret;
	}


void CRtpFlow::SendData(TDes8& , TSockAddr &, TInt , TRequestStatus  &aStatus)
	{
		
		TRequestStatus *stat = &aStatus;
		User::RequestComplete(stat,KErrNone);
	}
	

void CRtpFlow::NewData(TUint aCount)
	{
	__ASSERT_DEBUG(iSessionDataNotify,User::Panic(KRtpFlowPanic, RtpFlowPanics::KPanicNoControlProvider));

	LOG( ESockLogExternal::Printf(KESockFlowTag,KRtpFlowSubTag,_L8("CRtpFlow %08x:\tNewData()"), this));

	CRtpBaseFlow::NewData(aCount);
	}
	

/* When an RTCP flow is created call this function to enable RTCP in
   RtpSession. The function will return a Pointer to RTPSession which
   can be passed on to RTCP Flow */
CRtpController* CRtpFlow::GetRtpSession() const
	{
	return 	iRtpSession;
	}

void CRtpFlow::DoCanClose(MSessionControlNotify::TDelete )
/**
Called from protocol to indicate that a graceful close has completed
*/
	{
	/* RTP Session is no longer Valid and Can be Closed now */
	/* OOPS no API to close the RtpSession :(*/
	}


void CRtpFlow::DoUpdateLocalAddress(const TSockAddr& anAddr)
/* Called by BaseFlow to notify that Local address has been 
   Updated. Ideally called only once */
	{
	iRtpSession->SetLocalAddress(anAddr);
	}

TInt CRtpFlow::DoGetOption(TUint level, TUint name, TDes8 &aOption) const
	{
	/* Socket option for RTP */
	if(KSolRtp == level)
		{
		switch(name)
			{
			case KSoLocalSSRC:
				{
				TUint locSSRC = iRtpSession->GetLocalSSRC();
				CopyOptionToDesc(aOption,locSSRC);
				break;
				}
			default:
				{
				return KErrNotSupported;
				}
			}
		return KErrNone;
		}

	if(iSSP)
	 return iSSP->GetOption(level,name,aOption);
	return KErrNone;
	}

TInt CRtpFlow::DoSetOption(TUint level, TUint name, const TDesC8 &aOption)
	{
	
	/* Socket option for RTP */
	if(KSolRtp == level)
		{
		TInt ret = KErrNone;
		switch(name)
			{
			case KSoRtpBandwidth:
				{
				TInt optInt=0;
				GETOPTION_OR_RETURN(aOption,optInt);
				iRtpSession->SetSessionBandwidth(optInt);
				break;
				}
			case KSoStreamParams:
				{
				TRtpStreamParams strmParams;
				GETOPTION_OR_RETURN(aOption,strmParams);
				iRtpSession->SetRtpStreamParameters(strmParams.iMaxDropout,
						                            strmParams.iMaxMisorder,
						                            strmParams.iMinSequential);
				break;
				}

			case KSoRtpTimeConvertion:
				{
				TRtpTimeConversion rtpTimeConv;
				GETOPTION_OR_RETURN(aOption,rtpTimeConv);
				iRtpSession->SetRTPTimeConversion(rtpTimeConv.iRtpTimeNow,
												  rtpTimeConv.iConversion);
				break;
				}
			
			case KSoSendNonRtpData:
				{
				TInt nonRtpData = 0;
				GETOPTION_OR_RETURN(aOption,nonRtpData);
				SetNonRtpSendData(nonRtpData);
				break;	
				}
				
			case KSoReceiveNonRtpData:
				{
				TInt nonRtpData = 0;
				GETOPTION_OR_RETURN(aOption,nonRtpData);
				iRtpSession->SetNonRtpData(nonRtpData);
				break;	
				}
			case KSoSamplingRate:
				{
				TSamplingRateInfo sampRateInfo;
				GETOPTION_OR_RETURN(aOption,sampRateInfo);
				ret = iRtpSession->SetSamplingRate(sampRateInfo.iPayloadType,
											 sampRateInfo.iSamplingRate);
				break;
				}

			default:
				{
				return KErrNotSupported;
				}
			}
		return ret;
		}
	if(iSSP)
		return iSSP->SetOption(level,name,aOption);
	return KErrNone;
	}
	
void CRtpFlow::SetNonRtpSendData(TInt aNonRtpData)
	{
	iNonRtpSendData = aNonRtpData;
	}
	
TInt CRtpFlow::NonRtpSendData()
	{
	return iNonRtpSendData;
	}