lizongbo at 618119.com 工作,生活,Android,前端,Linode,Ubuntu,nginx,java,apache,tomcat,Resin,mina,Hessian,XMPP,RPC

2009年01月15日

基于apache mina 2.0.0 M4和google Protocol Buffers 2.0.3的java RPC实例(6.Client端的接口调用)

Filed under: Java,RPC — 标签:, , , , — lizongbo @ 01:03

接上一篇:http://618119.com/archives/2009/01/15/130.html
1.ProtobufRpcChannel.java(相当于thrift的TBinaryProtocol).
[code]
package com.lizongbo.protobufrpc;

import com.google.protobuf.RpcChannel;
import java.net.URL;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.RpcController;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import java.net.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

public class ProtobufRpcChannel implements RpcChannel {
private URL serviceUrl = null;
public ProtobufRpcChannel(String url) {
try {
try {
URL.setURLStreamHandlerFactory(new
HessianURLStreamHandlerFactory());
} catch (Exception ex) {
ex.printStackTrace();

}
serviceUrl = new URL(url);
} catch (MalformedURLException ex) {
ex.printStackTrace();
}
}

public void callMethod(MethodDescriptor method, RpcController controller,
Message request, Message responsePrototype,
RpcCallback done) {
try {
System.out.println(method.getService().getFullName());
System.out.println(method.getFullName());
method.toProto().toByteArray();
System.out.println(responsePrototype.getClass());
if (“hessiantcp”.equalsIgnoreCase(serviceUrl.getProtocol()) ||
“http”.equalsIgnoreCase(serviceUrl.getProtocol())) {
Socket socket = new Socket(serviceUrl.getHost(),
serviceUrl.getPort());
DataOutputStream out = new DataOutputStream(socket.
getOutputStream());
DataInputStream in = new DataInputStream(socket.getInputStream());
ProtobufRPCRequest req = new ProtobufRPCRequest();
req.setServiceName(method.getService().getFullName());
req.setMethodName(method.getName());
req.setRequestMessage(request.toByteArray());
out.write(req.toByteArray()); //传数据包
out.flush(); //例子此处尚不完善,会有1448错误
int len = in.readInt();
System.out.println(“应答包长:” + len);
byte[] buf = new byte[len – 4];
in.read(buf, 0, buf.length);
Message res = responsePrototype.newBuilderForType().mergeFrom(
buf).build();
done.run(res);
in.close();
out.close();
} else {
//@todo
}
}

catch (Exception e) {
throw new RuntimeException(e);
}

}
}

[/code]

2.RPCTest.java(客户端接口调用)
[code]
package com.lizongbo.protobufrpc;

import java.net.*;
import com.google.protobuf.RpcCallback;
import com.lizongbo.mobileqq.UserInfoServiceRpcController;
import com.lizongbo.mobileqq.QQUserInfoProtos;

public class RPCTest {

public static void main(String[] args) {
for (int i = 123456; i < 123458; i++) { try { ProtobufRpcChannel rpcChannel = new ProtobufRpcChannel( "hessiantcp://127.0.0.1:8080/mobileqq"); UserInfoServiceRpcController controller = new UserInfoServiceRpcController(); QQUserInfoProtos.UserInfoService service = QQUserInfoProtos. UserInfoService.newStub(rpcChannel); QQUserInfoProtos.GetUserInfoRequest.Builder builder = QQUserInfoProtos.GetUserInfoRequest.newBuilder(); builder.setQqNo(123456); QQUserInfoProtos.GetUserInfoRequest req = builder.build(); service.getUserInfo(controller, req, new RpcCallback() {
public void run(QQUserInfoProtos.GetUserInfoResponse
response) {
System.out.println(response.getQqNo() + “,result” +
response.getResult() +
“\t” +
response.getUserInfo());
}
});

} catch (Exception ex) {
ex.printStackTrace();
}

}

}
}

[/code]

基于apache mina 2.0.0 M4和google Protocol Buffers 2.0.3的java RPC实例(5.Server端的IoHandler)

Filed under: Java — 标签:, , , , — lizongbo @ 00:49

接上一篇: http://618119.com/archives/2009/01/15/129.html
ProtobufRPCServerSessionHandler.java

[code]
package com.lizongbo.protobufrpc;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import java.util.Map;
import java.util.HashMap;
import com.google.protobuf.Service;
import com.lizongbo.mobileqq.UserInfoServiceImpl;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.lizongbo.mobileqq.UserInfoServiceRpcController;

public class ProtobufRPCServerSessionHandler extends IoHandlerAdapter {
/**存放service实例的map*/
private Map servicemap = new HashMap();
public ProtobufRPCServerSessionHandler() {
UserInfoServiceImpl userInfoServiceImpl = new UserInfoServiceImpl();
String serviceName = userInfoServiceImpl.getDescriptor().getFullName();
System.out.println(“put “+serviceName);
servicemap.put(serviceName, userInfoServiceImpl);
}

public void messageReceived(final IoSession session, Object message) throws
Exception {
System.out.println(“messageReceived == ” + session.getClass());
ProtobufRPCRequest req = (ProtobufRPCRequest) message;
Service service = servicemap.get(req.getServiceName());
System.out.println(“service========”+service);
System.out.println(“req.getMethodName()==”+req.getMethodName());
MethodDescriptor md = service.getDescriptorForType().
findMethodByName(req.getMethodName());
System.out.println(“md=========”+md);
Message request = service.
getRequestPrototype(md).
newBuilderForType().mergeFrom(
req.getRequestMessage()).build();

service.callMethod(md,
new UserInfoServiceRpcController(), request,
new
RpcCallback() {
public void run(Message response) {
System.out.println(
“write socket for:” +
response.
toString());
byte[] data = response.
toByteArray();
try {
ProtobufRPCResponse res = new ProtobufRPCResponse();
res.setResponseMessage(data);
session.write(res);
} catch (Exception ex) {
ex.printStackTrace();
}

}
}
);
}

public void exceptionCaught(IoSession session, Throwable cause) throws
Exception {
System.out.println(“exceptionCaught == ” + session.getClass());
cause.printStackTrace();

}
}

[/code]

基于apache mina 2.0.0 M4和google Protocol Buffers 2.0.3的java RPC实例(4.RPC请求和应答的编码解码)

Filed under: Java — 标签:, , , , — lizongbo @ 00:41

接上一篇:http://618119.com/archives/2009/01/13/128.html
1.ProtobufRPCCodecFactory.java
[code]
package com.lizongbo.protobufrpc;

import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;

public class ProtobufRPCCodecFactory implements ProtocolCodecFactory {

ProtocolDecoder decoder = new ProtobufRPCRequestProtocolDecoder();
ProtocolEncoder encoder = new ProtobufRPCResponseProtocolEncoder();

public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
return encoder;
}

public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
return decoder;
}

}

[/code]

2. ProtobufRPCRequestProtocolDecoder.java
[code]
package com.lizongbo.protobufrpc;

import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.core.session.AttributeKey;

public class ProtobufRPCRequestProtocolDecoder implements ProtocolDecoder {

private static final AttributeKey BUF_BYTE = new AttributeKey(
ProtobufRPCRequestProtocolDecoder.class, “bufb”);

public void decode(IoSession ioSession, IoBuffer ioBuffer,
ProtocolDecoderOutput protocolDecoderOutput) throws
Exception {
try {
IoBuffer bufTmp = null;
byte[] buf = (byte[]) ioSession.getAttribute(BUF_BYTE);
if (buf == null) {
System.out.println(“没有尚未处理的数据” + ioBuffer.remaining());
bufTmp = ioBuffer;
} else {
System.out.println(“合并尚未处理的数据” + ioBuffer.remaining());
bufTmp = IoBuffer.allocate(buf.length + ioBuffer.remaining());
bufTmp.setAutoExpand(true);
bufTmp.put(buf);
bufTmp.put(ioBuffer);
bufTmp.flip();
} while (bufTmp.remaining() >= 4
&& bufTmp.remaining() >= bufTmp.getInt(bufTmp.position())) { // 循环处理数据包
System.out.println(“循环处理数据包”);
int dataLen = bufTmp.getInt(bufTmp.position());
byte[] b = new byte[dataLen];
bufTmp.get(b);
ProtobufRPCRequest pak = new ProtobufRPCRequest();
pak.setReqByteLen(b.length);
pak.readFrom(b, 4);
System.out.println(“往下传递”);
protocolDecoderOutput.write(pak);
}
if (bufTmp.hasRemaining()) { // 如果有剩余的数据,则放入Session中
System.out.println(“如果有剩余的数据,则放入Session中” + bufTmp.remaining());
byte[] tmpb = new byte[bufTmp.remaining()];
bufTmp.get(tmpb);
ioSession.setAttribute(BUF_BYTE, tmpb);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}

public void dispose(IoSession session) throws Exception {
System.out.println(“dispose”);

}

public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws
Exception {
System.out.println(“finishDecode”);
}

}

[/code]

3. ProtobufRPCResponseProtocolEncoder.java
[code]
package com.lizongbo.protobufrpc;

import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.core.buffer.IoBuffer;

public class ProtobufRPCResponseProtocolEncoder implements ProtocolEncoder {
public void encode(IoSession ioSession, Object object,
ProtocolEncoderOutput protocolEncoderOutput) throws
Exception {
ProtobufRPCResponse res = (ProtobufRPCResponse) object;
IoBuffer buf = IoBuffer.allocate(16, true);
buf.setAutoExpand(true);
byte[] b = res.toByteArray();
System.out.println(“应答包长:”+b.length);
//System.out.println(HessianIO.toHexString(b));
buf.put(b);
buf.flip();
protocolEncoderOutput.write(buf);
}

public void dispose(IoSession ioSession) throws Exception {
}
}

[/code]

Older Posts »

Powered by WordPress