Amazon S3高性能读取方案

现状和目标

客户公司每天大约有150,000个流格式数据对象存储到Amazon S3中,为使用这些数据,需要从Amazon S3中快速读取超过千万级的流格式对象并导出为数据分析的格式。

Amazon S3数据读取

S3和Google Storage类似,是一个在线云存储平台,也使用Bucket/key作为存储的格式,其中key中可以带多个分隔符/作为子节点的标识。

// key
/attachments/2014-01/27/cover
由于用户实际存储的数据量非常大,Amazon SDK提供的S3读取接口AmazonS3.listObjects()是分页读取,默认每次返回1000个对象。
AmazonS3 s3Client = new AmazonS3Client(credentials, configuration);
// return first 1,000 objects
s3Client.listObjects(BUCKET);
如果需要完整遍历所有的S3对象,使用ObjectListing.isTruncated()方法检查是否是最后一页数据:
ObjectListing current = s3Client.listObjects(BUCKET);
while (current.isTruncated()) {
    readObject(current);
    current = s3Client.listNextBatchOfObjects(current);
}
readObjects(current);

高性能读取方案

使用Amazon SDK直接读取10,000个S3对象是30分钟,改进读取性能,首先想到的方案是使用Hadoop平台,操作步骤:

  1. 在Amazon EC2上搭建Hadoop平台,因为S3和EC2的数据传输是免费的;
  2. 配置S3作为Hadoop HDFS的替代存储;
  3. 通过Hadoop HDFS的文件操作命令来批量读取S3中的数据。
    参见:http://wiki.apache.org/hadoop/AmazonS3

但这种方式的复杂点在于需要搭建一个Hadoop的环境,数据抽取的程序也需要基于Map Reduce来编写。

第二种方案是使用多线程读取,先尝试了一款支持多线程的开源S3 API jets3t,Hadoop也是基于这款API与S3集成,但它无法忽略掉通过代理访问的警告,直接就退出连接了,估计错误是发生在客户公司的IP访问策略,检查了jets3t代码中HttpClient的部分也未找到对应S3 Configuration的withProxyWorkstation设置,所以未能成功应用。

接下来需要尝试自己编写多线程代码去完成读取。 经过测试,S3遍历Key的速度非常快,所以我们考虑,先将符合目标的Key遍历出来放到一个同步数组中,然后使用多线程程序去从数组中获取Key,再使用Key从S3中取得对象流。

public class MultiDownloadService {
    private LinkedList summaries = new LinkedList();

    public void listKeys() {
        ObjectListing current = s3Client.listObjects(BUCKET)
        while (current.isTruncated()) {
            this.summaries.addAll(current.getObjectSummaries());
            current = s3Client.listNextBatchOfObjects(current);
        }
        this.summaries.addAll(current.getObjectSummaries());
    }

    private void readObject(S3ObjectSummary summary) {
        S3Object object = this.s3Client.getObject(BUCKET, summary.getKey());
        object.getObjectContent();
        // get string from S3ObjectInputStream
    }

    private synchronized S3ObjectSummary getSummary() {
        return this.summaries.poll();
    }

    class ObjectReader implements Runnable {
        public void run() {
            S3ObjectSummary summary = getSummary();
            while (summary != null) {
                readObject(summary);
                summary = getSummary();
            }
        }
    }

    private void multiDownload(int threadCount) {
        for (int i = 1; i <= threadCount; i++) {
            Thread t = new Thread(new ObjectReader());
            t.start();
        }
    }

    public static void main(String args[]) {
        int threadCount = 10;
        if (args != null) {
            threadCount = Integer.parseInt(args[0]);
        }
        MultiDownloadService service = new MultiDownloadService();
        service.listKeys();
        service.multiDownload(threadCount);
    }
}
使用Maven Surefire运行单元测试程序时,因为Surefire本身也是多线程,只有在同时运行多个单元测试时内部的多线程才能完成任务,否则会提前退出。改用Maven编译为可执行程序运行,测试成功。 在30个线程的情况下,下载10,000个对象时间为81秒,比起之前的1829秒,性能提升了95%.
mvn clean compile exec:java -Dexec.args="30"
© 2018 Silent River All Rights Reserved. 本站访客数人次 本站总访问量
Theme by hiero