lizongbo at 618119.com 工作,生活,Android,前端,Linode,Ubuntu,nginx,java,apache,tomcat,Resin,mina,Hessian,XMPP,RPC

2009年01月15日

基于apache mina 2.0.0 M4和google Protocol Buffers 2.0.3的java RPC实例(4.RPC请求和应答的编码解码)

Filed under: Java — 标签:, , , , — lizongbo @ 00:41

接上一篇:http://618119.com/archives/2009/01/13/128.html
1.ProtobufRPCCodecFactory.
[code]
package com.lizongbo.protobufrpc;

import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;

public class ProtobufRPCCodecFactory implements ProtocolCodecFactory {

ProtocolDecoder decoder = new ProtobufRPCRequestProtocolDecoder();
ProtocolEncoder encoder = new ProtobufRPCResponseProtocolEncoder();

public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
return encoder;
}

public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
return decoder;
}

}

[/code]

2. ProtobufRPCRequestProtocolDecoder.java
[code]
package com.lizongbo.protobufrpc;

import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.core.session.AttributeKey;

public class ProtobufRPCRequestProtocolDecoder implements ProtocolDecoder {

private static final AttributeKey BUF_BYTE = new AttributeKey(
ProtobufRPCRequestProtocolDecoder.class, “bufb”);

public void decode(IoSession ioSession, IoBuffer ioBuffer,
ProtocolDecoderOutput protocolDecoderOutput) throws
Exception {
try {
IoBuffer bufTmp = null;
byte[] buf = (byte[]) ioSession.getAttribute(BUF_BYTE);
if (buf == null) {
System.out.println(“没有尚未处理的数据” + ioBuffer.remaining());
bufTmp = ioBuffer;
} else {
System.out.println(“合并尚未处理的数据” + ioBuffer.remaining());
bufTmp = IoBuffer.allocate(buf.length + ioBuffer.remaining());
bufTmp.setAutoExpand(true);
bufTmp.put(buf);
bufTmp.put(ioBuffer);
bufTmp.flip();
} while (bufTmp.remaining() >= 4
&& bufTmp.remaining() >= bufTmp.getInt(bufTmp.position())) { // 循环处理数据包
System.out.println(“循环处理数据包”);
int dataLen = bufTmp.getInt(bufTmp.position());
byte[] b = new byte[dataLen];
bufTmp.get(b);
ProtobufRPCRequest pak = new ProtobufRPCRequest();
pak.setReqByteLen(b.length);
pak.readFrom(b, 4);
System.out.println(“往下传递”);
protocolDecoderOutput.write(pak);
}
if (bufTmp.hasRemaining()) { // 如果有剩余的数据,则放入Session中
System.out.println(“如果有剩余的数据,则放入Session中” + bufTmp.remaining());
byte[] tmpb = new byte[bufTmp.remaining()];
bufTmp.get(tmpb);
ioSession.setAttribute(BUF_BYTE, tmpb);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}

public void dispose(IoSession session) throws Exception {
System.out.println(“dispose”);

}

public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws
Exception {
System.out.println(“finishDecode”);
}

}

[/code]

3. ProtobufRPCResponseProtocolEncoder.java
[code]
package com.lizongbo.protobufrpc;

import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.core.buffer.IoBuffer;

public class ProtobufRPCResponseProtocolEncoder implements ProtocolEncoder {
public void encode(IoSession ioSession, Object object,
ProtocolEncoderOutput protocolEncoderOutput) throws
Exception {
ProtobufRPCResponse res = (ProtobufRPCResponse) object;
IoBuffer buf = IoBuffer.allocate(16, true);
buf.setAutoExpand(true);
byte[] b = res.toByteArray();
System.out.println(“应答包长:”+b.length);
//System.out.println(HessianIO.toHexString(b));
buf.put(b);
buf.flip();
protocolEncoderOutput.write(buf);
}

public void dispose(IoSession ioSession) throws Exception {
}
}

[/code]

没有评论 »

No comments yet.

RSS feed for comments on this post. TrackBack URL

Leave a comment

Powered by WordPress