“RPC”目录存档

基于apache mina 2.0.0 M4和google Protocol Buffers 2.0.3的java RPC实例(6.Client端的接口调用)

2009年01月15日,星期四

接上一篇:http://618119.com/archives/2009/01/15/130.html
1.ProtobufRpcChannel.java(相当于thrift的TBinaryProtocol).
[code]
package com.lizongbo.protobufrpc;

import com.google.protobuf.RpcChannel;
import java.net.URL;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.RpcController;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import java.net.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

public class ProtobufRpcChannel implements RpcChannel {
private URL serviceUrl = null;
public ProtobufRpcChannel(String url) {
try {
try {
URL.setURLStreamHandlerFactory(new
HessianURLStreamHandlerFactory());
} catch (Exception ex) {
ex.printStackTrace();

}
serviceUrl = new URL(url);
} catch (MalformedURLException ex) {
ex.printStackTrace();
}
}

public void callMethod(MethodDescriptor method, RpcController controller,
Message request, Message responsePrototype,
RpcCallback done) {
try {
System.out.println(method.getService().getFullName());
System.out.println(method.getFullName());
method.toProto().toByteArray();
System.out.println(responsePrototype.getClass());
if ("hessiantcp".equalsIgnoreCase(serviceUrl.getProtocol()) ||
"http".equalsIgnoreCase(serviceUrl.getProtocol())) {
Socket socket = new Socket(serviceUrl.getHost(),
serviceUrl.getPort());
DataOutputStream out = new DataOutputStream(socket.
getOutputStream());
DataInputStream in = new DataInputStream(socket.getInputStream());
ProtobufRPCRequest req = new ProtobufRPCRequest();
req.setServiceName(method.getService().getFullName());
req.setMethodName(method.getName());
req.setRequestMessage(request.toByteArray());
out.write(req.toByteArray()); //传数据包
out.flush(); //例子此处尚不完善,会有1448错误
int len = in.readInt();
System.out.println("应答包长:" + len);
byte[] buf = new byte[len - 4];
in.read(buf, 0, buf.length);
Message res = responsePrototype.newBuilderForType().mergeFrom(
buf).build();
done.run(res);
in.close();
out.close();
} else {
//@todo
}
}

catch (Exception e) {
throw new RuntimeException(e);
}

}
}

[/code]

2.RPCTest.java(客户端接口调用)
[code]
package com.lizongbo.protobufrpc;

import java.net.*;
import com.google.protobuf.RpcCallback;
import com.lizongbo.mobileqq.UserInfoServiceRpcController;
import com.lizongbo.mobileqq.QQUserInfoProtos;

public class RPCTest {

public static void main(String[] args) {
for (int i = 123456; i < 123458; i++) {
try {
ProtobufRpcChannel rpcChannel = new ProtobufRpcChannel(
"hessiantcp://127.0.0.1:8080/mobileqq");
UserInfoServiceRpcController controller = new
UserInfoServiceRpcController();
QQUserInfoProtos.UserInfoService service = QQUserInfoProtos.
UserInfoService.newStub(rpcChannel);
QQUserInfoProtos.GetUserInfoRequest.Builder builder =
QQUserInfoProtos.GetUserInfoRequest.newBuilder();
builder.setQqNo(123456);
QQUserInfoProtos.GetUserInfoRequest req = builder.build();
service.getUserInfo(controller,
req,
new RpcCallback GetUserInfoResponse>() {
public void run(QQUserInfoProtos.GetUserInfoResponse
response) {
System.out.println(response.getQqNo() + ",result" +
response.getResult() +
"\t" +
response.getUserInfo());
}
});

} catch (Exception ex) {
ex.printStackTrace();
}

}

}
}

[/code]

Tags: apache mina, google protobuf, Hessian, Java, RPC

Related posts

基于apache mina 2.0.0 M4和google Protocol Buffers 2.0.3的java RPC实例(2.service端)

2009年01月13日,星期二

接上一篇:http://618119.com/archives/2009/01/10/124.html
1.接口功能实现:

[code]
package com.lizongbo.mobileqq;

import com.lizongbo.mobileqq.QQUserInfoProtos.*;
import com.google.protobuf.RpcController;
import com.google.protobuf.RpcCallback;

public class UserInfoServiceImpl extends UserInfoService {
public void getUserInfo(RpcController controller,
GetUserInfoRequest request,
RpcCallback < com.lizongbo.mobileqq.
QQUserInfoProtos.GetUserInfoResponse > done) {

GetUserInfoResponse.Builder builder = GetUserInfoResponse.
newBuilder();
builder.setQqNo(request.getQqNo());

if (request.getQqNo() == 123456) {
builder.setResult(0);
QQUser.Builder qb = QQUser.newBuilder();
qb.setQqNo(request.getQqNo());
qb.setNickNmae("lizongbo");
qb.setPostCode("618100");
qb.setSignName("google Protocol Buffers 远程调用");
qb.setAddress("google");
builder.setUserInfo(qb);
} else {
builder.setResult( -1);
}
done.run(builder.build());
}

public void updateUserInfo(RpcController controller,
UpdateUserInfoRequest request,
RpcCallback < com.lizongbo.mobileqq.
QQUserInfoProtos.UpdateUserInfoResponse > done) {
UpdateUserInfoResponse.Builder builder = UpdateUserInfoResponse.
newBuilder();
builder.setQqNo(request.getQqNo());

if (request.getQqNo() == 123456) {
builder.setResult(0);
builder.setMessage("成功!");
} else {
builder.setResult( -1);
builder.setMessage("失败!");
}
done.run(builder.build());
}

}

[/code]

2.RpcController接口的实现:

[code]
package com.lizongbo.mobileqq;

import com.google.protobuf.RpcController;
import com.google.protobuf.RpcCallback;

public class UserInfoServiceRpcController implements RpcController {

public String errorText() {
return "no error";
}

public boolean failed() {
return false;
}

public boolean isCanceled() {
return false;
}

public void notifyOnCancel(RpcCallback callback) {
System.out.println("run RpcCallback by notifyOnCancel");
}

public void reset() {
System.out.println("do nothing");
}

public void setFailed(String reason) {
System.out.println("setFailed " + reason);
}

public void startCancel() {
System.out.println("run startCancel");
}
}

[/code]

Tags: apache mina, google protobuf, Hessian, Java, RPC

Related posts

选择或设计实现远程RPC调用所需要考虑的n个方面

2008年08月27日,星期三

选择或设计实现远程RPC调用所需要考虑的n个方面

(前段时间记录的零散片段,却一直没时间仔细整理,仅记录在此,以做备忘。)
可参考协议(组件)为:
http,smtp,pop,ftp,dns,burlap,json,xml-rpc,xmpp,smpp,rmi,soap,hessian,thrift , protocol buffers等等
需要考虑以下方面:

必备条件:
a.多语言支持
b.强大的序列化和反序列化(xml,text,pdu,text+stream)
c.多种传输模式

1.服务端所支持的编程语言。
java,C,C++,C#,PHP等等

2.客户端支持的语言。
java,C,C++,JavaScript等

3.字节流转码。
支持GBK,UTF-8等。

4.支持的基础数据类型。
char,int,long,double,floadt,boolean,List,Map,Object(Struct)

5.同名重载方法区别

6.支持数据包的转发代理 (前端负载均衡)

7.自定义超时连接

8.数据包转换 (对象的序列化和反序列化)

9.数据类型扩展

10.类与接口实现的约束

11.是否支持负载限制 (比如超过100并发时,直接返回系统忙)

12.failover处理

13.事务控制

14.异步调用

15.数据包版本自动识别

16.TCP/UDP支持
(UDP 有1472字节限制。)

17.代码生成器(主要针对数据包的序列化)

18.与现有框架的整合难度(组件化程度)

19.大文件传输

21.数据包序列化效率

客户端连接池
应用范围

22.网络带宽约束(数据包是否支持压缩)

23.服务接口监控统计

数据包加密

SSL支持

是否需要握手消息(是需多次交互还是简单的请求应答模式)

调试抓包的方便性

是否支持双向调用(xmpp支持)

部署难易程度

参数配置难度

是否支持请求队列

请求应答的对应关系

协议可读性

支持客户端并发限制(比如同一ip每秒只能够调用10次)

断点续传和重发数据

URL设置

access.log配置与合并,自定义 logger(用于控制是否打印敏感信息)

过滤器拦截模式

接口代码可读性

常用端口选择

重启服务时的热切换

黏性会话支持(基于IP,或者基于sessionid)

安全端口

是否可以多端口

配置文件

是否能够方便获取真实客户端ip和端口
MDC支持
是否支持线程局部变量(类似webservice获取session,ICE的Current)

防雪崩(提供初始化预热加载)

Tags: google protobuf, Hessian, RPC, thrift

Related posts