`

JMF 利用RTPManager进行视频音频流传输聊天

    博客分类:
  • JMF
 
阅读更多

 转:http://blog.csdn.net/maydie1989/article/details/6820482

http://download.csdn.net/detail/maydie1989/3636156

JMF(Java Media Framework)想必大家都不陌生  它为音频和视频等媒体内容的捕获、回放、传输和编码转换等提供了一个统一的架构 视频传输大概思路如下

其中有几个常用到的类  - >   ● 数据源(DataSource) ● 媒体定位器(MediaLocator) ● 播放器(Player) 

                                                 ● 处理器(Processor)  ● 管理器(Manager) ● RTP管理器(RTPManager)

一般发送端的情况是首先获得视频音频设备实例(CaptureDeviceInfo)   -> 然后得到 DataSource -> Processor -> 得到processor的输出DataSource,通过RTPManager以stream的形式发出

而接收端则是通过RTPManager得到ReceiveStream -> 还原后通过Player进行播放

发送端RTPTransmiter

  1. public class RTPTransmiter  
  2. {  
  3.     private Processor processor;  
  4.     private Integer lock = new Integer(0);  
  5.     private DataSource dataOutput;  
  6.     private boolean failed = false;  
  7.     private RTPManager[] rtpMgrs;  
  8.     private String ipAddress;  
  9.     private int portBase;  
  10.     private int destPortBase; //接收端 基础端口    
  11.     public RTPTransmiter(String ipAddress,int portBase,int destPortBase) throws Exception  
  12.     {  
  13.     this.ipAddress = ipAddress;  
  14.     this.portBase = portBase;  
  15.     this.destPortBase = destPortBase;  
  16.     createProcessor() ;  
  17.     createTransmitter() ;  
  18.     processor.start();  
  19.     }  
  20.       
  21.     private void createProcessor() throws Exception   
  22.     {         
  23.     Vector<?> devices = CaptureDeviceManager.getDeviceList(null);  
  24.     CaptureDeviceInfo diVideo = null;  
  25.     CaptureDeviceInfo diAudio = null;  
  26.               
  27.         if (devices.size() > 2)   
  28.         {  
  29.             diVideo = (CaptureDeviceInfo) devices.elementAt(2);//2是视频捕捉器   
  30.              diAudio = (CaptureDeviceInfo) devices.elementAt(0);  
  31.               DataSource[] dataSources = new DataSource[2];  
  32.             dataSources[1] = Manager.createDataSource(diVideo.getLocator());  
  33.             dataSources[0] = Manager.createDataSource(diAudio.getLocator());  
  34.                     processor = Manager.createProcessor(Manager.createMergingDataSource(dataSources));//合并video和audio的 datasource                  
  35.     }  
  36.     if(!waitForState(processor, Processor.Configured)) //configure processor   
  37.         throw new Exception("cant configured processor");  
  38.     ContentDescriptor cd = new ContentDescriptor(ContentDescriptor.RAW_RTP);   
  39.     processor.setContentDescriptor(cd);  
  40.     if(!waitForState(processor, Controller.Realized)) //realize processor   
  41.         throw new Exception("cant realized processor");  
  42.     setJPEGQuality(processor, 0.5f);//设置图像质量 百分比    
  43.     dataOutput = processor.getDataOutput();  
  44.     }  
  45.       
  46.     private void createTransmitter()   
  47.     {  
  48.     PushBufferDataSource pbds = (PushBufferDataSource)dataOutput;  
  49.     PushBufferStream pbss[] = pbds.getStreams();  
  50.     System.out.println("pbss.length is: "+pbss.length);  
  51.           
  52.     rtpMgrs = new RTPManager[pbss.length];//每一个通道由一个RTPManager负责   
  53.     SessionAddress localAddr, destAddr;  
  54.     InetAddress ipAddr;  
  55.     SendStream sendStream;  
  56.     int port;  
  57.     for (int i = 0; i < pbss.length; i++)   
  58.     {  
  59.          try   
  60.               {  
  61.         rtpMgrs[i] = RTPManager.newInstance();        
  62.         port = portBase + 2*i;  
  63.         int clientPort = destPortBase +2*i;  
  64.                    
  65.         ipAddr = InetAddress.getByName(ipAddress);  
  66.           
  67.         localAddr = new SessionAddress( InetAddress.getLocalHost(),port); //本机测 所以IP都是InetAddress.getLocalHost()   
  68.                   
  69.         destAddr = new SessionAddress( InetAddress.getLocalHost(), clientPort);  
  70.           
  71.         rtpMgrs[i].initialize( localAddr);  
  72.                   
  73.         rtpMgrs[i].addTarget( destAddr);  
  74.                   
  75.         System.err.println( "Created RTP session: " + ipAddress + " " + port);  
  76.                    
  77.         sendStream = rtpMgrs[i].createSendStream(dataOutput, i);          
  78.         sendStream.start(); //开始传输   
  79.          }   
  80.               catch (Exception  e)   
  81.          {  
  82.         e.printStackTrace();  
  83.          }  
  84.         }  
  85.    
  86.     }  
  87.   
  88.       
  89.     void setJPEGQuality(Player p, float val)   
  90.     {  
  91.         Control cs[] = p.getControls();  
  92.         QualityControl qc = null;  
  93.         VideoFormat jpegFmt = new VideoFormat(VideoFormat.JPEG);  
  94.    
  95.         for (int i = 0; i < cs.length; i++)   
  96.         {  
  97.             if (cs[i] instanceof QualityControl && cs[i] instanceof Owned)   
  98.             {  
  99.                 Object owner = ((Owned)cs[i]).getOwner();  
  100.                 if (owner instanceof Codec)   
  101.                 {  
  102.                     Format fmts[] = ((Codec)owner).getSupportedOutputFormats(null);  
  103.                     for (int j = 0; j < fmts.length; j++)   
  104.                     {  
  105.                         if (fmts[j].matches(jpegFmt))   
  106.                         {  
  107.                             qc = (QualityControl)cs[i];  
  108.                                 qc.setQuality(val);  
  109.                             System.err.println("- Setting quality to " +   
  110.                                 val + " on " + qc);  
  111.                             break;  
  112.                         }  
  113.                     }  
  114.                 }  
  115.                 if (qc != null)  
  116.                     break;  
  117.             }  
  118.         }  
  119.     }  
  120.       
  121.     private synchronized boolean waitForState(Processor p, int state)   
  122.     {  
  123.         p.addControllerListener(new StateListener());  
  124.         failed = false;  
  125.       
  126.         if (state == Processor.Configured)   
  127.         {  
  128.             p.configure();  
  129.         }   
  130.         else if (state == Processor.Realized)   
  131.         {  
  132.             p.realize();  
  133.         }  
  134.    
  135.         while (p.getState() < state && !failed)   
  136.         {  
  137.             synchronized (lock)   
  138.             {  
  139.                 try   
  140.                 {  
  141.                     lock.wait();  
  142.                 }   
  143.                 catch (InterruptedException ie)   
  144.                 {  
  145.                     return false;  
  146.                 }  
  147.             }  
  148.         }  
  149.       
  150.         if (failed)  
  151.             return false;  
  152.         else  
  153.             return true;  
  154.     }  
  155.       
  156.     class StateListener implements ControllerListener   
  157.     {  
  158.         public void controllerUpdate(ControllerEvent ce)   
  159.         {  
  160.             if (ce instanceof ControllerClosedEvent)  
  161.                 failed = true;  
  162.   
  163.             if (ce instanceof ControllerEvent) {  
  164.                 synchronized (lock) {  
  165.                     lock.notifyAll();  
  166.                 }  
  167.             }  
  168.         }  
  169.    }  
  170. }  
public class RTPTransmiter
{
    private Processor processor;
    private Integer lock = new Integer(0);
    private DataSource dataOutput;
    private boolean failed = false;
    private RTPManager[] rtpMgrs;
    private String ipAddress;
    private int portBase;
    private int destPortBase; //接收端 基础端口 
    public RTPTransmiter(String ipAddress,int portBase,int destPortBase) throws Exception
    {
	this.ipAddress = ipAddress;
	this.portBase = portBase;
	this.destPortBase = destPortBase;
	createProcessor() ;
	createTransmitter() ;
	processor.start();
    }
	
    private void createProcessor() throws Exception 
    {		
	Vector<?> devices = CaptureDeviceManager.getDeviceList(null);
	CaptureDeviceInfo diVideo = null;
	CaptureDeviceInfo diAudio = null;
         	
       	if (devices.size() > 2) 
       	{
	        diVideo = (CaptureDeviceInfo) devices.elementAt(2);//2是视频捕捉器
 	         diAudio = (CaptureDeviceInfo) devices.elementAt(0);
	          DataSource[] dataSources = new DataSource[2];
	        dataSources[1] = Manager.createDataSource(diVideo.getLocator());
	        dataSources[0] = Manager.createDataSource(diAudio.getLocator());
         	        processor = Manager.createProcessor(Manager.createMergingDataSource(dataSources));//合并video和audio的 datasource	        	
	}
 	if(!waitForState(processor, Processor.Configured)) //configure processor
 		throw new Exception("cant configured processor");
 	ContentDescriptor cd = new ContentDescriptor(ContentDescriptor.RAW_RTP); 
	processor.setContentDescriptor(cd);
 	if(!waitForState(processor, Controller.Realized)) //realize processor
 		throw new Exception("cant realized processor");
 	setJPEGQuality(processor, 0.5f);//设置图像质量 百分比 
 	dataOutput = processor.getDataOutput();
    }
    
    private void createTransmitter() 
    {
	PushBufferDataSource pbds = (PushBufferDataSource)dataOutput;
	PushBufferStream pbss[] = pbds.getStreams();
	System.out.println("pbss.length is: "+pbss.length);
		
	rtpMgrs = new RTPManager[pbss.length];//每一个通道由一个RTPManager负责
	SessionAddress localAddr, destAddr;
	InetAddress ipAddr;
	SendStream sendStream;
	int port;
	for (int i = 0; i < pbss.length; i++) 
	{
	     try 
              {
		rtpMgrs[i] = RTPManager.newInstance();	    
		port = portBase + 2*i;
		int clientPort = destPortBase +2*i;
				 
		ipAddr = InetAddress.getByName(ipAddress);
		
		localAddr = new SessionAddress( InetAddress.getLocalHost(),port); //本机测 所以IP都是InetAddress.getLocalHost()
				
		destAddr = new SessionAddress( InetAddress.getLocalHost(), clientPort);
		
		rtpMgrs[i].initialize( localAddr);
				
		rtpMgrs[i].addTarget( destAddr);
				
		System.err.println( "Created RTP session: " + ipAddress + " " + port);
				 
		sendStream = rtpMgrs[i].createSendStream(dataOutput, i);		
		sendStream.start(); //开始传输
	     } 
              catch (Exception  e) 
	     {
		e.printStackTrace();
	     }
        }
 
    }

    
    void setJPEGQuality(Player p, float val) 
    {
		Control cs[] = p.getControls();
		QualityControl qc = null;
		VideoFormat jpegFmt = new VideoFormat(VideoFormat.JPEG);
 
		for (int i = 0; i < cs.length; i++) 
		{
		    if (cs[i] instanceof QualityControl && cs[i] instanceof Owned) 
		    {
				Object owner = ((Owned)cs[i]).getOwner();
 				if (owner instanceof Codec) 
				{
				    Format fmts[] = ((Codec)owner).getSupportedOutputFormats(null);
				    for (int j = 0; j < fmts.length; j++) 
				    {
						if (fmts[j].matches(jpegFmt)) 
						{
						    qc = (QualityControl)cs[i];
				    		    qc.setQuality(val);
						    System.err.println("- Setting quality to " + 
								val + " on " + qc);
						    break;
						}
				    }
				}
				if (qc != null)
				    break;
		    }
		}
    }
    
    private synchronized boolean waitForState(Processor p, int state) 
    {
		p.addControllerListener(new StateListener());
		failed = false;
	
 		if (state == Processor.Configured) 
		{
		    p.configure();
		} 
		else if (state == Processor.Realized) 
		{
		    p.realize();
		}
 
		while (p.getState() < state && !failed) 
		{
		    synchronized (lock) 
		    {
				try 
				{
				    lock.wait();
				} 
				catch (InterruptedException ie) 
				{
				    return false;
				}
		    }
		}
	
		if (failed)
		    return false;
		else
		    return true;
    }
    
    class StateListener implements ControllerListener 
    {
    	public void controllerUpdate(ControllerEvent ce) 
    	{
     	    if (ce instanceof ControllerClosedEvent)
    			failed = true;

     	    if (ce instanceof ControllerEvent) {
	    		synchronized (lock) {
	    		    lock.notifyAll();
	    		}
    	    }
    	}
   }
}

RTPReciver 核心部分 需要为每个rtpManager添加 ReceiveStreamListener和SessionListener ,在ReceiveStreamListenerNewReceiveStreamEvent事件中添加ControllerListener

  1. mgrs[i] = (RTPManager) RTPManager.newInstance();  
  2. //添加 SessionListener   
  3. mgrs[i].addSessionListener(this);  
  4. //添加 ReceiveStreamListener   
  5. mgrs[i].addReceiveStreamListener(this);  
  6.                   
  7. //得到服务器端IP地址   
  8. ipAddr = InetAddress.getByName("sender ip");  
  9. localAddr= new SessionAddress( InetAddress.getLocalHost(), sendPort);  
  10. destAddr = new SessionAddress( ipAddr, sendPort);                 
  11. mgrs[i].initialize( localAddr);  
  12. BufferControl bc = (BufferControl)mgrs[i].getControl("javax.media.control.BufferControl");  
  13. if (bc != null)  
  14.     bc.setBufferLength(350);  
  15. mgrs[i].addTarget(destAddr);  
mgrs[i] = (RTPManager) RTPManager.newInstance();
//添加 SessionListener
mgrs[i].addSessionListener(this);
//添加 ReceiveStreamListener
mgrs[i].addReceiveStreamListener(this);
				
//得到服务器端IP地址
ipAddr = InetAddress.getByName("sender ip");
localAddr= new SessionAddress( InetAddress.getLocalHost(), sendPort);
destAddr = new SessionAddress( ipAddr, sendPort);				
mgrs[i].initialize( localAddr);
BufferControl bc = (BufferControl)mgrs[i].getControl("javax.media.control.BufferControl");
if (bc != null)
    bc.setBufferLength(350);
mgrs[i].addTarget(destAddr);

关于ReceiveStreamListener

  1. @Override  
  2. public void update(ReceiveStreamEvent evt)  
  3. {  
  4.     //从大到小进行定位,RTPManager->Participant->ReceiveStream   
  5.     RTPManager mgr = (RTPManager)evt.getSource();    
  6.     Participant participant = evt.getParticipant();  
  7.     ReceiveStream stream = evt.getReceiveStream();        
  8.     if (evt instanceof RemotePayloadChangeEvent){.....};//远程发送者改变负载    
  9.     else if(evt instanceof NewReceiveStreamEvent){.....<SPAN style="COLOR: #ff0000">需要添加ControllerListener</SPAN>};//接收到新的stream       
  10.     else if (evt instanceof ByeEvent){.....};//结束      
  11. }  
	@Override
	public void update(ReceiveStreamEvent evt)
	{
    	//从大到小进行定位,RTPManager->Participant->ReceiveStream
		RTPManager mgr = (RTPManager)evt.getSource();  
		Participant participant = evt.getParticipant();
		ReceiveStream stream = evt.getReceiveStream(); 		
		if (evt instanceof RemotePayloadChangeEvent){.....};//远程发送者改变负载 
		else if(evt instanceof NewReceiveStreamEvent){.....需要添加ControllerListener};//接收到新的stream 	
		else if (evt instanceof ByeEvent){.....};//结束	
	}

关于SessionListener

  1. @Override  
  2. public void update(SessionEvent evt)  
  3. {  
  4.              //有新的用户加入   
  5.     if (evt instanceof NewParticipantEvent)   
  6.     {  
  7.         Participant p = ((NewParticipantEvent)evt).getParticipant();              
  8.     }  
  9. }  
	@Override
	public void update(SessionEvent evt)
	{
    	         //有新的用户加入
		if (evt instanceof NewParticipantEvent) 
		{
		    Participant p = ((NewParticipantEvent)evt).getParticipant();		    
		}
	}

关于ControllerListener

  1. @Override  
  2. public void controllerUpdate(ControllerEvent ce)  
  3. {  
  4.     if (ce instanceof RealizeCompleteEvent){......};//player完成了realize     
  5.     if (ce instanceof ControllerErrorEvent){......};//出现异常         
  6. }  
	@Override
	public void controllerUpdate(ControllerEvent ce)
	{
		if (ce instanceof RealizeCompleteEvent){......};//player完成了realize	
		if (ce instanceof ControllerErrorEvent){......};//出现异常		
	}

jmf的rtp应该是用udp做的...貌似
ps:话说 百度的题好难啊 不如腾讯的友好 哈哈哈

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics