RPC框架系列——Avro

September 9, 2011 / Avro, RPC, serialization

1.下载与安装

官方网站:http://avro.apache.org/

下载地址:http://mirror.bit.edu.cn/apache/avro/avro-1.7.7/avro-src-1.7.7.tar.gz

安装之前确保已经装了maven

cd /usr/local/src
wget http://mirror.bit.edu.cn/apache/avro/avro-1.7.7/avro-src-1.7.7.tar.gz
tar zxvf avro-src-1.7.7.tar.gz
cd avro-src-1.7.7/lang/java
mvn clean install -DskipTests

安装后,可以找到如下生成的jar包:

2.消息结构与服务接口

Avro的模式主要由JSON对象来表示,Avro支持8种基本类型(Primitive Type)和6种复杂类型(Complex Type:records、enums、arrays、maps、unions 和fixed),基本类型可以由JSON字符串来表示。

Avro支持两种序列化编码方式:二进制编码和JSON编码,使用二进制编码会高效序列化,并且序列化后得到的结果会比较小。

基本类型:

null: no value
boolean: a binary value
int: 32-bit signed integer
long: 64-bit signed integer
float: single precision (32-bit) IEEE 754 floating-point number
double: double precision (64-bit) IEEE 754 floating-point number
bytes: sequence of 8-bit unsigned bytes
string: unicode character sequence

首先编写一个message.avpr文件,定义一个消息结构。

{
    "namespace": "avro",
    "protocol": "MessageProtocol",
    "doc": "This is a message.",
    "name": "Message",
    "types": [
        {"name":"Message", "type":"record",
            "fields":[
                {"name":"name", "type":"string"},
                {"name":"type", "type":"int"},
                {"name":"price", "type":"double"},
                {"name":"valid", "type":"boolean"},
                {"name":"content", "type":"bytes"},
                {"name":"tags", "type":{"type":"array", "items":"string"}}
        ]}
    ],
    "messages":    {
        "sendMessage":{
            "doc" : "test",
            "request" :[{"name":"message","type":"Message" }],
            "response" :"Message"
        }          
    }    
}

其中定义了1种类型叫做message,有5个成员name、type、price、valid、content、tags。还定义了1个消息服务叫做sendMessage,输入有一个参数,类型是message,返回message。

运行一下命令可以生成java代码:

$ java -jar /path/to/avro-tools-1.7.7.jar compile protocol message.avpr .

在当前目录下生成了avro目录,里边有两个java文件:

Message.java

/**
 * Autogenerated by Avro
 *
 * DO NOT EDIT DIRECTLY
 */
package avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Message extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  @Deprecated public java.lang.CharSequence name;
  @Deprecated public int type;
  @Deprecated public double price;
  @Deprecated public boolean valid;
  @Deprecated public java.nio.ByteBuffer content;
  @Deprecated public java.util.List<java.lang.CharSequence> tags;
  ...
}

MessageProtocol.java

/**
 * Autogenerated by Avro
 * 
 * DO NOT EDIT DIRECTLY
 */
package avro;

@SuppressWarnings("all")
/** This is a message. */
@org.apache.avro.specific.AvroGenerated
public interface MessageProtocol {
  public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"MessageProtocol\",\"namespace\":\"avro\",\"doc\":\"This is a message.\",\"name\":\"Message\",\"types\":[{\"type\":\"record\",\"name\":\"Message\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"type\",\"type\":\"int\"},{\"name\":\"price\",\"type\":\"double\"},{\"name\":\"valid\",\"type\":\"boolean\"},{\"name\":\"content\",\"type\":\"bytes\"},{\"name\":\"tags\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}],\"messages\":{\"sendMessage\":{\"doc\":\"test\",\"request\":[{\"name\":\"message\",\"type\":\"Message\"}],\"response\":\"Message\"}}}");
  /** test */
  avro.Message sendMessage(avro.Message message) throws org.apache.avro.AvroRemoteException;

  @SuppressWarnings("all")
  /** This is a message. */
  public interface Callback extends MessageProtocol {
    public static final org.apache.avro.Protocol PROTOCOL = avro.MessageProtocol.PROTOCOL;
    /** test */
    void sendMessage(avro.Message message, org.apache.avro.ipc.Callback<avro.Message> callback) throws java.io.IOException;
  }
}

3.序列化

Avro有两种序列化编码:binary和JSON。

3.1.Binary Encoding

基本类型:

null:0字节

boolean:1个字节——0(false)或1(true)

int和long使用变长的zig-zag编码

float:4个字节

double:8个字节

bytes:1个long,后边跟着字节序列

string:1个long,后边跟着UTF-8编码的字符

3.2.records

按字段声明的顺序编码值,如下面一个record schema:

{
    "type": "record", 
    "name": "test",
    "fields" : [
        {"name": "a", "type": "long"},
        {"name": "b", "type": "string"}]
}

实例化这个record,a字段的值是27(编码为0x36),b字段的值是“foo”(编码为06 66 6f 6f),那么这个record编码结果是:

36 06 66 6f 6f

3.3.enums

一个enum被编码为一个int,比如,考虑这个enum。

{"type": "enum", "name": "Foo", "symbols": ["A", "B", "C", "D"] }

这将被编码为一个取值范围为[0,3]的int,0表示“A”,3表示“D”。

3.4.arrays

arrays编码为block序列,每个block包含一个long的count值,紧跟着的是array items,一个block的count为0表示该block是array的结尾。

3.5.maps

mapss编码为block序列,每个block包含一个long的count值,紧跟着的是key/value对,一个block的count为0表示该block是map的结尾。

3.6.union

union编码以一个long值开始,表示后边的数据是union中的哪种数据类型。

3.7.fixed

编码为指定数目的字节。

4.rpc通信实现

4.1 使用生成的Java代码

协议实现MessageProtocolImpl.java:

package avro;

import org.apache.avro.AvroRemoteException;

public class MessageProtocolImpl implements MessageProtocol {
    @Override
    public Message sendMessage(Message message) throws AvroRemoteException {
        System.out.println("Got a new message:");
        System.out.println("\tname: " + message.getName());
        System.out.println("\ttype: " + message.getType());
        System.out.println("\tprice: " + message.getPrice());
        System.out.println("\tvalid: " + message.getValid());
        System.out.println("\tcontent: " + new String(message.getContent().array()));
        System.out.print("\ttags:");
        for (CharSequence tag: message.getTags()) {
            System.out.print(" " + tag);
        }
        System.out.printf("");
        return message;
    }
}

服务端实现Server.java:

package avro;

import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.specific.SpecificResponder;

import java.net.InetSocketAddress;

public class Server {
    private org.apache.avro.ipc.Server server;
    private int port;
    public Server(int port) {
        this.port = port;
    }

    public void start() {
        try {
            server = new NettyServer(new SpecificResponder(MessageProtocol.class,
                    new MessageProtocolImpl()), new InetSocketAddress(port));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        if (args.length != 1) {
            System.out.println("Usage: Server port");
            System.exit(0);
        }
        int port = Integer.parseInt(args[0]);
        System.out.println("Starting server");
        new Server(port).start();
        System.out.println("Server started");
    }
}

客户端实现Client.java:

package avro;

import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
public class Client {
    private String host = null;
    private int port = 0;
    private int size = 0;
    private int count = 0;
    public Client(String host, int port, int size, int count) {
        this.host = host;
        this.port = port;
        this.size = size;
        this.count = count;
    }
    public long sendMessage() throws Exception {
        NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(host, port));

        MessageProtocol proxy = SpecificRequestor.getClient(
                MessageProtocol.class, client);

        Message message = Message.newBuilder()
                .setName("test")
                .setType(1)
                .setPrice(999.99)
                .setValid(true)
                .setContent(ByteBuffer.wrap("Hello".getBytes()))
                .setTags(new ArrayList<CharSequence>(){
                    {
                        add("test");
                        add("hello");
                        add("world");
                    }
                })
                .build();

        long start = System.currentTimeMillis();
        for (int i = 0; i < count; i++) {
            proxy.sendMessage(message);
        }
        long end = System.currentTimeMillis();
        System.out.println((end - start) + " ms");
        return end - start;
    }
    public long run() {
        long res = 0;
        try {
            res = sendMessage();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return res;
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 4) {
            System.out.println("Usage: Client host port dataSize count");
            System.exit(0);
        }
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        int size = Integer.parseInt(args[2]);
        int count = Integer.parseInt(args[3]);
        new Client(host, port, size, count).run();
    }
}

4.2 不定义服务接口

Avro的RPC实现也可以不定义服务接口,但需要从.avpr文件中解析协议,协议中定义了消息结构和消息服务。message.avpr中定义了一个类型叫message,定义了一个服务叫sendMessage。

工具类Utils.java:

package avro;

import java.io.File;
import java.io.IOException;
import java.net.URL;

import org.apache.avro.Protocol;

public class Utils {
    public static Protocol getProtocol() {
        Protocol protocol = null;
        try {
            URL url = Utils.class.getClassLoader().getResource("message.avpr");
            protocol = Protocol.parse(new File(url.getPath()));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return protocol;
    }
}

服务端实现Server.java:

package avro;

import org.apache.avro.Protocol;
import org.apache.avro.Protocol.Message;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.generic.GenericResponder;

public class Server extends GenericResponder {
    private Protocol protocol = null;
    private int port;

    public Server(Protocol protocol, int port) {
        super(protocol);
        this.protocol = protocol;
        this.port = port;
    }

    public Object respond(Message message, Object request) throws Exception {
        GenericRecord req = (GenericRecord) request;
        GenericRecord msg = (GenericRecord)(req.get("message"));
        // process the request
        …
        return msg;
    }

    public void run() {
        try {
            HttpServer server = new HttpServer(this, port);

            server.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        if (args.length != 1) {
            System.out.println("Usage: Server port");
            System.exit(0);
        }
        int port = Integer.parseInt(args[0]);
        new Server(Utils.getProtocol(), port).run();
    }
}

客户端实现Client.java:

package avro;

import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;

import org.apache.avro.util.Utf8;
import org.apache.avro.Protocol;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.generic.GenericRequestor;

public class Client {
    private Protocol protocol = null;
    private String host = null;
    private int port = 0;
    private int size = 0;
    private int count = 0;

    public Client(Protocol protocol, String host, int port, int size, int count) {
        this.protocol = protocol;
        this.host = host;
        this.port = port;
        this.size = size;
        this.count = count;
    }

    public long sendMessage() throws Exception {
        GenericRecord requestData = new GenericData.Record(
                protocol.getType("message"));
        // initiate the request data
        …

        GenericRecord request = new GenericData.Record(protocol.getMessages()
                .get("sendMessage").getRequest());
        request.put("message", requestData);

        Transceiver t = new HttpTransceiver(new URL("http://" + host + ":"
                + port));
        GenericRequestor requestor = new GenericRequestor(protocol, t);

        long start = System.currentTimeMillis();
        for (int i = 0; i < count; i++) {
            requestor.request("sendMessage", request);
        }
        long end = System.currentTimeMillis();
        System.out.println(end - start);
        return end - start;
    }

    public long run() {
        long res = 0;
        try {
            res = sendMessage();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return res;
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 4) {
            System.out.println("Usage: Client host port dataSize count");
            System.exit(0);
        }

        String host = args[0];
        int port = Integer.parseInt(args[1]);
        int size = Integer.parseInt(args[2]);
        int count = Integer.parseInt(args[3]);
        new Client(Utils.getProtocol(), host, port, size, count).run();
    }
}

5.参考资料

  1. Avro Documentation: http://avro.apache.org/docs/current/index.html
  2. Getting Started Java: http://avro.apache.org/docs/1.7.6/gettingstartedjava.html