2009年01月 存档

基于apache mina 2.0.0 M4和google Protocol Buffers 2.0.3的java RPC实例(3.定义RPC请求和应答)

2009年01月13日,星期二

接上一篇: http://618119.com/archives/2009/01/13/127.html

google Protocol Buffers RPC的应答包:
[code]
package com.lizongbo.protobufrpc;

import .io.*;
import java.nio.ByteBuffer;

public class ProtobufRPCRequest {
private String serviceName;
private String methodName;
private byte[] requestMessage;
private int reqByteLen;
public String getServiceName() {
return serviceName;
}

public String getMethodName() {
return methodName;
}

public byte[] getRequestMessage() {

return requestMessage;
}

public int getReqByteLen() {
return reqByteLen;
}

public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}

public void setRequestMessage(byte[] requestMessage) {

this.requestMessage = requestMessage;
}

public void setReqByteLen(int reqByteLen) {
this.reqByteLen = reqByteLen;
}

public void readFrom(byte[] b, int offset) {
System.out.println("readFrom:" + new String(b));
java.io.ByteArrayInputStream bai = new ByteArrayInputStream(b, offset,
b.length - offset);
DataInputStream buf = new DataInputStream(bai);

try {
// buf.getInt();
int serviceNameLen = buf.readInt();
System.out.println("serviceNameLen==" + serviceNameLen);
byte bt[] = new byte[serviceNameLen];
buf.read(bt);
serviceName = new String(bt);
System.out.println("serviceName==" + serviceName);
int methodNameLen = buf.readInt();
System.out.println("methodNameLen==" + methodNameLen);
bt = new byte[methodNameLen];
buf.read(bt);
methodName = new String(bt);
int requestLen = buf.readInt();
System.out.println("requestLen==" + requestLen);
requestMessage = new byte[requestLen];
buf.read(requestMessage);
} catch (Exception ex) {
ex.printStackTrace();
}
}

public byte[] toByteArray() {
java.io.ByteArrayOutputStream bao = new ByteArrayOutputStream();
DataOutputStream out = new
DataOutputStream(bao);

try {
byte[] serviceNameb = serviceName.getBytes();
System.out.println("serviceNameLen==" + serviceNameb.length);
out.writeInt(serviceNameb.length);
out.write(serviceNameb);
byte[] methodNameb = methodName.getBytes();
System.out.println("methodNameLen==" + methodNameb.length);
out.writeInt(methodNameb.length);
out.write(methodNameb);
System.out.println("requestMessageLen==" + requestMessage.length);
out.writeInt(requestMessage.length);
out.write(requestMessage);
byte[] reqByte = bao.toByteArray();
reqByteLen = reqByte.length;
bao = new ByteArrayOutputStream();
out = new DataOutputStream(bao);
System.out.println("reqByteLen==" + (reqByte.length + 4));
out.writeInt(reqByte.length + 4);
out.write(reqByte);
} catch (IOException ex) {
ex.printStackTrace();
}
return bao.toByteArray();
}

}

[/code]

google Protocol Buffers RPC的应答包:
[code]
package com.lizongbo.protobufrpc;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.io.DataInputStream;
import java.io.ByteArrayInputStream;

public class ProtobufRPCResponse {
private int resByteLen;
private byte[] responseMessage;
public int getResByteLen() {
return resByteLen;
}

public byte[] getResponseMessage() {
return responseMessage;
}

public void setResByteLen(int resByteLen) {
this.resByteLen = resByteLen;
}

public void setResponseMessage(byte[] responseMessage) {
this.responseMessage = responseMessage;
}

public void readFrom(byte[] b, int offset) {

try {//此处不够完善
java.io.ByteArrayInputStream bai = new ByteArrayInputStream(b, offset,
b.length - offset);
DataInputStream buf = new DataInputStream(bai);
resByteLen = buf.readInt();
responseMessage = new byte[resByteLen - 4];
buf.read(responseMessage);
} catch (Exception ex) {
ex.printStackTrace();
}
}

public byte[] toByteArray() {
java.io.ByteArrayOutputStream bao = new ByteArrayOutputStream();
DataOutputStream out = new
DataOutputStream(bao);

try {
out.writeInt(responseMessage.length + 4);
out.write(responseMessage);
} catch (IOException ex) {
ex.printStackTrace();
}
return bao.toByteArray();
}

}

[/code]

Tags: , , , ,

基于apache mina 2.0.0 M4和google Protocol Buffers 2.0.3的java RPC实例(2.service端)

2009年01月13日,星期二

接上一篇:http://618119.com/archives/2009/01/10/124.html
1.接口功能实现:

[code]
package com.lizongbo.mobileqq;

import com.lizongbo.mobileqq.QQUserInfoProtos.*;
import com.google.protobuf.RpcController;
import com.google.protobuf.RpcCallback;

public class UserInfoServiceImpl extends UserInfoService {
public void getUserInfo(RpcController controller,
GetUserInfoRequest request,
RpcCallback < com.lizongbo.mobileqq.
QQUserInfoProtos.GetUserInfoResponse > done) {

GetUserInfoResponse.Builder builder = GetUserInfoResponse.
newBuilder();
builder.setQqNo(request.getQqNo());

if (request.getQqNo() == 123456) {
builder.setResult(0);
QQUser.Builder qb = QQUser.newBuilder();
qb.setQqNo(request.getQqNo());
qb.setNickNmae("lizongbo");
qb.setPostCode("618100");
qb.setSignName("google Protocol Buffers 远程调用");
qb.setAddress("google");
builder.setUserInfo(qb);
} else {
builder.setResult( -1);
}
done.run(builder.build());
}

public void updateUserInfo(RpcController controller,
UpdateUserInfoRequest request,
RpcCallback < com.lizongbo.mobileqq.
QQUserInfoProtos.UpdateUserInfoResponse > done) {
UpdateUserInfoResponse.Builder builder = UpdateUserInfoResponse.
newBuilder();
builder.setQqNo(request.getQqNo());

if (request.getQqNo() == 123456) {
builder.setResult(0);
builder.setMessage("成功!");
} else {
builder.setResult( -1);
builder.setMessage("失败!");
}
done.run(builder.build());
}

}

[/code]

2.RpcController接口的实现:

[code]
package com.lizongbo.mobileqq;

import com.google.protobuf.RpcController;
import com.google.protobuf.RpcCallback;

public class UserInfoServiceRpcController implements RpcController {

public String errorText() {
return "no error";
}

public boolean failed() {
return false;
}

public boolean isCanceled() {
return false;
}

public void notifyOnCancel(RpcCallback callback) {
System.out.println("run RpcCallback by notifyOnCancel");
}

public void reset() {
System.out.println("do nothing");
}

public void setFailed(String reason) {
System.out.println("setFailed " + reason);
}

public void startCancel() {
System.out.println("run startCancel");
}
}

[/code]

Tags: , , , ,

扩展java.net.URL支持自定义协议来优化hessian的调用

2009年01月11日,星期日

扩展java.net.URL支持自定义协议来优化hessian调用

hessian是个高性能的java RPC调用协议,但是官方默认只提供了基于http和https两种方式的远程调用。
虽然每天使用http方式调用上千万次也没出现性能问题,(有jdk一份功劳,jdk1.5及以上版本支持了http.KeepAlive,
默认设置为: .KeepAlive.remainingData=512
http.KeepAlive.queuedConnections=10)
但是如果能够改成tcp纯socket长连接池方式,性能是还可以优化的,因为把http的header头信息给省了七七八八。
由于hessian使用的URL和URLConnection来发送hessian请求和应答的,而URL的协议处理是可以扩展的,
因此可以通过扩展URL支持自定义协议来灵活切换hessian使用http或者tcp或者udp方式进行请求发送和接收应答。

查找相关资料后整理了三种扩展方法:

1.通过用户指定的package名称的最后一位作为协议名称(包名要是小写的)。
例如我自定义了三个协议,hessiantcp,hessianudp,hessiantcpudp;
则需要建立三个继承java.net.URLStreamHandler的Handler类(实现类的名字必须是Handler).
即:
com.lizongbo..protocol.hessiantcp.Handler.
com.lizongbo.hessian.protocol.hessianudp.Handler.java
com.lizongbo.hessian.protocol.hessiantcpudp.Handler.java

在运行时,还要指定系统属性java.protocol.handler.pkgs
或者在java命令行里增加启动参数:
-Djava.protocol.handler.pkgs=com.lizongbo.hessian.protocol(多个包名之间用竖线隔开,例如:
-Djava.protocol.handler.pkgs=com.lizongbo.hessian.protocola|com.lizongbo.hessian.protocolb)

或者在代码里调用创建URL之前,先执行:
[code]
System.setProperty("java.protocol.handler.pkgs","com.lizongbo.hessian.protocol");
serviceUrl = new ("hessiantcp://618119.com/blog/hessian/service");
[/code]
这样,用户便能够通过URL对象处理hessiantcp://这样的协议了,
否则,使用hessiantcp://这样的协议会导致异常。

参考:http://www.tuscany.org.cn/index.php/Tuscany与JBoss集成中遇到的问题及排除

http://java.sun.com/developer/onlineTraining/protocolhandlers/

2.设置指定的URLStreamHandlerFactory也可以扩展自定义的协议。
[code]
package com.lizongbo.hessian.protocol;

import java.net.*;
import java.util.Hashtable;

class HessianURLStreamHandlerFactory implements URLStreamHandlerFactory {
private String packagePrefix = "com.lizongbo.hessian.protocol";
protected static Hashtable handlers
= new Hashtable
();
private URLStreamHandlerFactory otherFactory;
static {
handler = new com.lizongbo.hessian.protocol.
hessiantcp.Handler();
handlers.put("hessiantcp", handler);
handler = new com.lizongbo.hessian.protocol.hessianudp.Handler();
handlers.put("hessianudp", handler);
handler = new com.lizongbo.hessian.protocol.hessiantcpudp.Handler();
handlers.put("hessiantcpudp", handler);

}

public HessianURLStreamHandlerFactory() {
this(null);
}

public HessianURLStreamHandlerFactory(URLStreamHandlerFactory otherFactory) {
this.setOtherFactory(otherFactory);
}

public void setOtherFactory(URLStreamHandlerFactory otherFactory) {
this.otherFactory = otherFactory;
}

public URLStreamHandlerFactory getOtherFactory() {
return otherFactory;
}

public URLStreamHandler createURLStreamHandler(String protocol) {
URLStreamHandler handler = handlers.get(protocol);
if (handler != null) {
return handler;
}
try {
String clsName = packagePrefix + "." + protocol + ".Handler";
Class cls = null;
try {
cls = Class.forName(clsName);
} catch (ClassNotFoundException e) {
ClassLoader cl = ClassLoader.getSystemClassLoader();
if (cl != null) {
cls = cl.loadClass(clsName);
}
}
if (cls != null) {
handler = (URLStreamHandler) cls.newInstance();
handlers.put(protocol, handler);
return handler;
}
} catch (Exception e) {
// any number of exceptions can get thrown here
}

if (otherFactory != null) {
return otherFactory.createURLStreamHandler(protocol);
}
if ("http".equalsIgnoreCase(protocol)) {
return null; //返回非null的URLStreamHandler还可以覆盖java默认实现协议的URLStreamHandler
}

return null;
}

}

[/code]

在代码里调用创建URL之前,先执行:
[code]
static{
try {
/**
该行代码只能执行一次,否则会抛出工厂已经定义的错误,错误信息如下:
java.lang.Error: factory already defined
at java.net.URL.setURLStreamHandlerFactory(URL.java:1074)
这样的方式还有个缺点,就是工厂一旦被其它第三方组件占用,那么使用这个方法就只能二者选一,
除非其它组件支持创建URLStreamHandlerFactory实例,
因为java.net.URL是不提供获取已经设置存在的factory的方法的。

*/
URL.setURLStreamHandlerFactory(new HessianURLStreamHandlerFactory());
} catch (Exception ex) {

}
}
URL serviceUrl = new URL("hessiantcp://618119.com/blog/hessian/service");
[/code]

早期java没有自带jsse的时候,想要使用到https协议就需要类似的处理,
参考: http://www.javaworld.com/javaworld/javatips/jw-javatip96.html

3.在创建URL的时候,手工识别,并实现自定义协议所需的URLStreamHandler.
代码如下:
public ProtobufRpcChannel(String url) {
try {
if (url != null && url.toLowerCase().startsWith(“hessiantcp://”)) {
serviceUrl = new URL(null, url, new URLStreamHandler() {
protected URLConnection openConnection(URL u) throws
IOException {
return null;//在这里处理
}
});
} else {
serviceUrl = new URL(url);
}
} catch (MalformedURLException ex) {
ex.printStackTrace();

}

可以参考一个jms协议的扩展例子:http://www.ibm.com/developerworks/cn/java/l-jms/index.html

有个注意事项,如果试用了非http方式的发送hessian请求,
接口调用方法的返回值不能够是java.io.InputStream,因为hessian的代码里写死了:
在返回InputStream的时候,连接类别被强行转成HttpURLConnection(但是ResultInputStream里对httpConn并没啥特别的操作,
暂时没看懂作者为何这样写的)

[code]
Object value = in.readObject(method.getReturnType());

if (value instanceof InputStream) {
value = new ResultInputStream(httpConn, is, in, (InputStream) value);
is = null;
httpConn = null;
}
[/code]

Tags: , , , , , , ,