List Info

Thread: #94: The RTMP network code should support client mode




#94: The RTMP network code should support client mode
user name
2006-07-26 14:20:29
#94: The RTMP network code should support client mode
---------------------------------+--------------------------
----------------
Reporter:  cepublishing-etc.de  |       Owner:  Red5
Development Team
    Type:  enhancement           |      Status:  new        
         
Priority:  normal                |   Component:  Red5 Server
         
 Version:  0.5                   |    Severity:  normal     
         
Keywords:                        |  
---------------------------------+--------------------------
----------------
 For test cases and connections to other RTMP servers the
network code
 should support client and server mode.

 The following patch allows a client connection using mina.

 {{{
 Index:
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/IRTMPHandler.java
 ===========================================================
========
 ---
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/IRTMPHandler.java
 (revision 0)
 +++
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/IRTMPHandler.java
 (revision 0)
  -0,0 +1,16 
 +package org.red5.server.net.rtmp;
 +
 +import org.red5.server.net.protocol.ProtocolState;
 +import org.red5.server.net.rtmp.codec.RTMP;
 +
 +public interface IRTMPHandler {
 +
 +       public void connectionOpened(RTMPConnection conn,
RTMP state);
 +
 +       public void messageReceived(RTMPConnection conn,
ProtocolState
 state, Object in) throws Exception;
 +
 +       public void messageSent(RTMPConnection conn, Object
message);
 +
 +       public void connectionClosed(RTMPConnection conn,
RTMP state);
 +
 +}
 Index:
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/RTMPHandler.java
 ===========================================================
========
 ---
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/RTMPHandler.java
 (revision 1223)
 +++
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/RTMPHandler.java
 (working copy)
  -51,6 +51,7 
  import org.red5.server.messaging.OOBControlMessage;
  import org.red5.server.net.protocol.ProtocolState;
  import org.red5.server.net.rtmp.codec.RTMP;
 +import org.red5.server.net.rtmp.event.BytesRead;
  import org.red5.server.net.rtmp.event.ChunkSize;
  import org.red5.server.net.rtmp.event.IRTMPEvent;
  import org.red5.server.net.rtmp.event.Invoke;
  -56,7 +57,6 
  import org.red5.server.net.rtmp.event.Invoke;
  import org.red5.server.net.rtmp.event.Notify;
  import org.red5.server.net.rtmp.event.Ping;
 -import org.red5.server.net.rtmp.event.BytesRead;
  import org.red5.server.net.rtmp.event.Unknown;
  import org.red5.server.net.rtmp.message.Constants;
  import org.red5.server.net.rtmp.message.Header;
  -73,7 +73,7 
  import org.red5.server.stream.StreamService;

  public class RTMPHandler
 -       implements Constants, StatusCodes {
 +       implements IRTMPHandler, Constants, StatusCodes {

         protected static Log log =
          LogFactory.getLog(RTMPHandler.class.getName());
  -196,7 +196,11 
                         ((PlaylistSubscriberStream)
 stream).written(sent.getMessage());
                 }
         }
 -
 +
 +       public void connectionOpened(RTMPConnection conn,
RTMP state) {
 +
 +       }
 +
         public void connectionClosed(RTMPConnection conn,
RTMP state) {
                 state.setState(RTMP.STATE_DISCONNECTED);
                 conn.close();
 Index:
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/RTMPMinaIoHandler.java
 ===========================================================
========
 ---
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/RTMPMinaIoHandler.java
 (revision 1203)
 +++
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/RTMPMinaIoHandler.java
 (working copy)
  -37,12 +37,18 
         protected static Log log =
         
LogFactory.getLog(RTMPMinaIoHandler.class.getName());

 -       protected RTMPHandler handler;
 +       protected IRTMPHandler handler;
 +
 +       protected boolean mode = RTMP.MODE_SERVER;

 -       public void setHandler(RTMPHandler handler) {
 +       public void setHandler(IRTMPHandler handler) {
                 this.handler = handler;
         }

 +       public void setMode(boolean mode) {
 +               this.mode=mode;
 +       }
 +
         private ProtocolCodecFactory codecFactory = null;

         public void setCodecFactory(ProtocolCodecFactory
codecFactory) {
  -76,19 +82,34 
                         log.warn("Raw buffer after
handshake, something
 odd going on");
                 }

 -               ByteBuffer out =
 ByteBuffer.allocate((Constants.HANDSHAKE_SIZE*2)+1);
 -
 -               if(log.isDebugEnabled()){
 -                       log.debug("Writing handshake
reply");
 -                       log.debug("handskake
size:"+in.remaining());
 +               if(rtmp.getMode()==RTMP.MODE_SERVER) {
 +                       if(log.isDebugEnabled()){
 +                               log.debug("Handshake
2nd phase");
 +                               log.debug("handskake
 size:"+in.remaining());
 +                       }
 +                       ByteBuffer out =
 ByteBuffer.allocate((Constants.HANDSHAKE_SIZE*2)+1);
 +                       out.put((byte)0x03);
 +                      
out.fill((byte)0x00,Constants.HANDSHAKE_SIZE);
 +                       out.put(in);
 +                       out.flip();
 +                       //in.release();
 +                       session.write(out);
 +               } else {
 +                       if(log.isDebugEnabled()) {
 +                               log.debug("Handshake
3d phase");
 +                               log.debug("handskake
 size:"+in.remaining());
 +                       }
 +                       in.skip(1);
 +                       ByteBuffer out =
 ByteBuffer.allocate(Constants.HANDSHAKE_SIZE);
 +                       int limit=in.limit();
 +                      
in.limit(in.position()+Constants.HANDSHAKE_SIZE);
 +                       out.put(in);
 +                       out.flip();
 +                       in.limit(limit);
 +                       in.skip(Constants.HANDSHAKE_SIZE);
 +                       session.write(out);
 +
                 }
 -
 -               out.put((byte)0x03);
 -              
out.fill((byte)0x00,Constants.HANDSHAKE_SIZE);
 -               out.put(in).flip();
 -               //in.release();
 -               session.write(out);
 -
         }

         public void messageSent(IoSession session, Object
message) throws
 Exception {
  -93,8 +114,16 

         public void messageSent(IoSession session, Object
message) throws
 Exception {
                 log.debug("messageSent");
 +               final RTMP rtmp =
 (RTMP)session.getAttribute(RTMP.SESSION_KEY);
                 final RTMPMinaConnection conn =
(RTMPMinaConnection)
 session.getAttachment();
                 handler.messageSent(conn, message);
 +               if(rtmp.getMode()==RTMP.MODE_CLIENT) {
 +                       if(message instanceof ByteBuffer) {
 +
 if(((ByteBuffer)message).limit()==Constants.HANDSHAKE_SIZE)
{
 +
 handler.connectionOpened((RTMPMinaConnection)session.getAtt
achment(),(RTMP)session.getAttribute(RTMP.SESSION_KEY));
 +                               }
 +                       }
 +               }
         }


  -110,6 +139,17 
                 cfg.setTcpNoDelay(true);
                 super.sessionOpened(session);

 +               RTMP
rtmp=(RTMP)session.getAttribute(RTMP.SESSION_KEY);
 +               if(rtmp.getMode()==RTMP.MODE_CLIENT) {
 +                       if(log.isDebugEnabled()){
 +                               log.debug("Handshake
1st phase");
 +                       }
 +                       ByteBuffer out =
 ByteBuffer.allocate(Constants.HANDSHAKE_SIZE+1);
 +                       out.put((byte)0x03);
 +                      
out.fill((byte)0x00,Constants.HANDSHAKE_SIZE);
 +                       out.flip();
 +                       session.write(out);
 +               }
         }

         public void sessionClosed(IoSession session) throws
Exception {
  -125,7 +165,7 
                         log.debug("Session
created");

                 // moved protocol state from connection
object to rtmp
 object
 -               session.setAttribute(RTMP.SESSION_KEY, new
 RTMP(RTMP.MODE_SERVER));
 +               session.setAttribute(RTMP.SESSION_KEY, new
RTMP(mode));

                 session.getFilterChain().addFirst(
                  "protocolFilter", new
 ProtocolCodecFilter(this.codecFactory) );
 Index:
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/codec/RTMPProtocolEncoder.java
 ===========================================================
========
 ---
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/codec/RTMPProtocolEncoder.java
 (revision 1203)
 +++
 /home/ce/Documents/workspace/Red5_trunk/src/org/red5/server
/net/rtmp/codec/RTMPProtocolEncoder.java
 (working copy)
  -369,8 +369,8 
                         serializer.serialize(output,
action); // seems
 right
                 }
                 if (invoke instanceof Invoke) {
 -                       serializer.serialize(output, new
 Integer(invoke.getInvokeId()));
 -                       serializer.serialize(output, null);
 +                       serializer.serialize(output, new
 Integer(invoke.getInvokeId()));
 +                       serializer.serialize(output,
 invoke.getConnectionParams());
                 }
                 if (!isPending && (invoke
instanceof Invoke)){
                         IPendingServiceCall pendingCall =
 (IPendingServiceCall) call;

 }}}

 A simple connection sample using the changes. (Works
against Red5 and FMS)

 {{{
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;

 import
org.apache.mina.transport.socket.nio.SocketConnector;
 import org.red5.io.object.Deserializer;
 import org.red5.io.object.Serializer;
 import org.red5.server.net.protocol.ProtocolState;
 import org.red5.server.net.rtmp.Channel;
 import org.red5.server.net.rtmp.IRTMPHandler;
 import org.red5.server.net.rtmp.RTMPConnection;
 import org.red5.server.net.rtmp.RTMPMinaIoHandler;
 import org.red5.server.net.rtmp.codec.RTMP;
 import org.red5.server.net.rtmp.codec.RTMPCodecFactory;
 import org.red5.server.net.rtmp.event.Invoke;
 import org.red5.server.net.rtmp.message.Packet;
 import org.red5.server.service.PendingCall;

 public class SimpleClient implements IRTMPHandler {

         public static void main(String[] args) {
                 RTMPCodecFactory codecFactory=new
RTMPCodecFactory();
                 codecFactory.setDeserializer(new
Deserializer());
                 codecFactory.setSerializer(new
Serializer());
                 codecFactory.init();

                 RTMPMinaIoHandler ioHandler=new
RTMPMinaIoHandler();
                 ioHandler.setCodecFactory(codecFactory);
                 ioHandler.setMode(RTMP.MODE_CLIENT);
                 ioHandler.setHandler(new SimpleClient());

                 SocketConnector connector = new
SocketConnector();
                 connector.connect(new
InetSocketAddress("detroit",1935),
 ioHandler);
         }

         public void connectionOpened(RTMPConnection conn,
RTMP state) {
                 System.out.println("opened");
                 Channel channel=conn.getChannel((byte)3);
                 Map<String,Object> params=new
HashMap<String, Object>();
                
params.put("app","test");
                 params.put("flashVer",
"WIN 9,0,16,0");
                 params.put("swfUrl","http://localhost/test.swf");
                 params.put("tcUrl",
"rtmp://localhost/test");
                 params.put("fpad", false);
                
params.put("audioCodecs",(double)615);
                
params.put("videoCodecs",(double)76);
                 params.put("pageUrl","http://localhost/test.html
");
                
params.put("objectEncoding",(double)0);
                 PendingCall pendingCall=new
PendingCall("connect");
                 Invoke invoke=new Invoke(pendingCall);
                 invoke.setConnectionParams(params);
                 invoke.setInvokeId(1);
                 channel.write(invoke);
         }

         public void messageReceived(RTMPConnection conn,
ProtocolState
 state, Object message) throws Exception {
                 System.out.println("message received
"+message);
                 if(message instanceof Packet) {
                         Packet p=(Packet)message;
                         System.out.println("got
packet "+p.getMessage());
                 }
         }

         public void messageSent(RTMPConnection conn, Object
message) {
                 System.out.println("message sent
"+message);

         }

         public void connectionClosed(RTMPConnection conn,
RTMP state) {
                 System.out.println("connection
closed");

         }

 }
 }}}

-- 
Ticket URL: <http://mirror1.cvsdude.com/trac/osflash/red5/ticket/94
>
Red5 <http://osflash.org/red5&g
t;
Red5 Server
_______________________________________________
Red5devs mailing list
Red5devsosflash.org
http://osflash.org/mailman/listinfo/red5devs_osflash.org

[1]

about | contact  Other archives ( Real Estate discussion Medical topics )