设为首页收藏本站

小牛社区-大数据学习交流社区|大数据免费学习资源

 找回密码
 立即注册!

QQ登录

只需一步,快速开始

扫一扫,访问微社区

查看: 445|回复: 0

Spark2.1.0与CarbonData1.0.0集群模式部署及使用入门指南

[复制链接]

155

主题

0

帖子

5

积分

吃土小白

Rank: 1

积分
5
发表于 2017-10-24 15:51:07 | 显示全部楼层 |阅读模式
  ApacheCarbonData是一个面向大数据平台的基于索引的列式数据格式,由华为大数据团队贡献给Apache社区,目前最新版本是1.0.0版。介于目前主流大数据组件应用场景的局限性,CarbonData诞生的初衷是希望通过仅保存一份数据来满足不同的应用场景,如:  
OLAP  顺序存取(Sequential Access)  随机存取(Random Access) 
  CarbonData也被评为2016年的BLACKDUCK奖,有关CarbonData的相关资料如下:
官网地址: http://carbondata.apache.org 
Github: http://github.com/carbondata/carbondata 
Mailing list: dev@carbondata.incubator.apache.org 
cwiki:http://cwiki.apache.org/confluence/display/CARBONDATA/CarbonData+Home 
Jira地址: http://issues.apache.org/jira/browse/CARBONDATA
  本文主要介绍Spark2.1.0 + CarbonData1.0.0集群模式部署流程,并辅以一个小案例来讲解如何在Spark shell下使用CarbonData。 
2 准备工作 
2.1 集群规划 
id  hostname  mem  cpu  storage 
1  master  32G  Intel(R) Core(TM) i5-6400 CPU @ 2.70GHz  SATA3 7200RPM 4T  
2  slave1  32G  Intel(R) Core(TM) i5-6400 CPU @ 2.70GHz  SATA3 7200RPM 8T  
3  slave2  32G  Intel(R) Core(TM) i5-6400 CPU @ 2.70GHz  SATA3 7200RPM 8T  
4  slave3  32G  Intel(R) Core(TM) i5-6400 CPU @ 2.70GHz  SATA3 7200RPM 8T  
5  slave4  32G  Intel(R) Core(TM) i5-6400 CPU @ 2.70GHz  SATA3 7200RPM 8T  
2.2 系统环境 
  操作系统
  下载地址:http://mirrors.163.com/
  建议版本:Unix-like environment (Linux, Mac OS X)
  版本查看:
# 示例(CentOS) 
[hadoop@master ~]$ cat /etc/redhat-release  
CentOS release 6.8 (Final) 
JDK 
  下载地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html
  建议版本:JDK1.8.0+
  版本查看:
[hadoop@master ~]$ java -version 
java version "1.8.0_60" 
Git 
  下载地址:http://git-scm.com/book/en/v2/Getting-Started-Installing-Git
  建议版本:无
  版本查看:
[hadoop@master ~]$ git --version 
git version 1.7.1 
  Maven
  下载地址:http://maven.apache.org/download.cgi
  建议版本:3.0.4
  版本查看:
[hadoop@master ~]$ mvn -v 
Apache Maven 3.0.4 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 2015-11-11T00:41:47+08:00) 
Maven home: /opt/maven-3.0.4 
  下载地址:http://hadoop.apache.org/#Download+Hadoop
  建议版本:2.7.2
  版本查看:
[hadoop@master ~]$ hadoop version 
Hadoop 2.7.2 
[hadoop@master ~]$ echo $HADOOP_HOME 
/opt/hadoop-2.7.2 
  Scala
  下载地址:http://www.scala-lang.org/
  建议版本:2.11.x
  版本查看:
[hadoop@master ~]$ scala -version 
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL 
  下载地址:http://spark.apache.org/downloads.html
  建议版本:2.1.0
  部署模式:Standalone/YARN
  版本查看:
[hadoop@master spark-2.1.0]$ ./bin/spark-submit --version 
Welcome to 
  ____              __
  / __/__  ___ _____/ /__
  _\ \/ _ \/ _ `/ __/  '_/
  /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
  /_/
Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_60 
[hadoop@master ~]$ echo $SPARK_HOME 
/opt/spark-2.1.0 
  Thrift
  下载地址:http://thrift.apache.org/download
  建议版本:0.9.3
  版本查看:
[hadoop@master ~]$ thrift -version 
Thrift version 0.9.3 
3 编译及部署 
3.1 编译 
  Step 1:源码下载
$ git clone http://github.com/apache/incubator-carbondata.git carbondata 
  Step 2:修改Maven私有仓库地址(可选)
  由于网络原因,从Maven中央仓库下载jar包可能非常慢,大家可根据自己的实际情况修改为企业内部私有仓库或阿里云等外部源,如:
# 修改conf/setting.xml文件 
 
         
  nexus
  nexus
  http://maven.aliyun.com/nexus/content/groups/public/
  *
 
 
  Step 3:编译打包
[hadoop@master ~]$ cd carbondata 
[hadoop@master carbondata]$ mvn clean package -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 -Phadoop-2.7.2 
  在编译打包的过程中,maven会自动下载所依赖的jar包,但可能还会有部分jar包无法下载成功导致打包失败的情况,此时需要我们手动去网上下载并将对应的jar包放到Maven localRepository的对应目录下并重新执行上述命令,执行成功后,会出现以下提示:
[INFO] ------------------------------------------------------------------------ 
[INFO] Reactor Summary: 
[INFO] 
[INFO] Apache CarbonData :: Parent ........................ SUCCESS [  1.319 s] 
[INFO] Apache CarbonData :: Common ........................ SUCCESS [16:82 min] 
[INFO] Apache CarbonData :: Core .......................... SUCCESS [03:23 min] 
[INFO] Apache CarbonData :: Processing .................... SUCCESS [  8.623 s] 
[INFO] Apache CarbonData :: Hadoop ........................ SUCCESS [  6.237 s] 
[INFO] Apache CarbonData :: Spark Common .................. SUCCESS [ 52.524 s] 
[INFO] Apache CarbonData :: Spark2 ........................ SUCCESS [ 50.118 s] 
[INFO] Apache CarbonData :: Spark Common Test ............. SUCCESS [ 25.072 s] 
[INFO] Apache CarbonData :: Assembly ...................... SUCCESS [  5.521 s] 
[INFO] Apache CarbonData :: Spark2 Examples ............... SUCCESS [  8.742 s] 
[INFO] ------------------------------------------------------------------------ 
[INFO] BUILD SUCCESS 
[INFO] ------------------------------------------------------------------------ 
[INFO] Total time: 29:42 min 
[INFO] Finished at: 2017-03-11T09:26:38+08:00 
[INFO] Final Memory: 211M/2513M 
[INFO] ------------------------------------------------------------------------ 
3.2 集群模式部署 
  注:本节是基于Spark Standalone模式的配置部署,Spark on Yarn模式下的配置部署类似但不完全相同,具体可参照文档:
  - Installing and Configuring CarbonData on Standalone Spark Cluster
  - Installing and Configuring CarbonData on “Spark on YARN” Cluster
  部署步骤如下:
  Step 1:复制CarbonData jar包
# 创建目录并拷贝 
[hadoop@master spark-2.1.0]$ mkdir $SPARK_HOME/carbonlib 
[hadoop@master spark-2.1.0]$ cp -r ~/carbondata/assembly/target/scala-2.11/carbondata_2.11-1.0.0-incubating-shade-hadoop2.7.2.jar ./carbonlib 
# 编辑环境变量 
[hadoop@master ~]$ vi /opt/spark-2.1.0/conf/spark-env.sh 
# 添加以下配置 
export SPARK_CLASSPATH=$SPARK_CLASSPATH:${SPARK_HOME}/carbonlib/* 
  Step 2:配置carbon.properties
# 拷贝至spark conf目录 
[hadoop@master ~]$ cp ~/carbondata/conf/carbon.properties.template /opt/spark-2.1.0/conf/carbon.properties 
# 修改配置 
#Mandatory. Carbon Store path 
carbon.storelocation=hdfs://master:9000/opt/carbonStore 
#Base directory for Data files 
carbon.ddl.base.hdfs.url=hdfs://master:9000/opt/data 
#lock文件存储方式 
carbon.lock.type=HDFSLOCK 
  Step 3:配置spark-defaults.conf
[hadoop@master ~]$ vi /opt/spark-2.1.0/conf/spark-defaults.conf 
# 添加以下配置 
spark.executor.extraJavaOptions         -Dcarbon.properties.filepath=/opt/spark-2.1.0/conf/carbon.properties 
spark.driver.extraJavaOptions           -Dcarbon.properties.filepath=/opt/spark-2.1.0/conf/carbon.properties 
  Step 4:分发至集群各节点
# Spark - conf 
[hadoop@master ~]$ scp -r /opt/spark-2.1.0/conf/ hadoop@slave1:/opt/spark-2.1.0/ 
[hadoop@master ~]$ scp -r /opt/spark-2.1.0/conf/ hadoop@slave2:/opt/spark-2.1.0/ 
[hadoop@master ~]$ scp -r /opt/spark-2.1.0/conf/ hadoop@slave3:/opt/spark-2.1.0/ 
[hadoop@master ~]$ scp -r /opt/spark-2.1.0/conf/ hadoop@slave4:/opt/spark-2.1.0/   
# Spark - carbonlib 
[hadoop@master ~]$ scp -r /opt/spark-2.1.0/carbonlib/ hadoop@slave1:/opt/spark-2.1.0/ 
[hadoop@master ~]$ scp -r /opt/spark-2.1.0/carbonlib/ hadoop@slave2:/opt/spark-2.1.0/ 
[hadoop@master ~]$ scp -r /opt/spark-2.1.0/carbonlib/ hadoop@slave3:/opt/spark-2.1.0/ 
[hadoop@master ~]$ scp -r /opt/spark-2.1.0/carbonlib/ hadoop@slave4:/opt/spark-2.1.0/ 
4 数据准备 
4.1 数据描述 
  为更聚焦于整个流程,我们的数据集仅含一张表,共21个字段,详细信息如下:
ID  COLUMN  TYPE  CARDINALITY  COMMENT 
1  id  BIGINT  TOTAL_NUM  ID  
2  order_code  STRING  TOTAL_NUM  订单编号  
3  sales_area_id  INT  100  销售区域ID  
4  sales_id  INT  10000  销售人员ID  
5  order_inputer  INT  100  录单人员ID  
6  pro_type  STRING  1000  产品型号  
7  currency  INT  50  订单币种  
8  exchange_rate  DECIMAL  1000  当前汇率  
9  unit_cost_price  DECIMAL  10000  成本单价  
10  unit_selling_price  DECIMAL  10000  销售单价  
11  order_num  INTEGER  1000  订单数量  
12  order_amount  DECIMAL  /  订单金额  
13  order_discount  DOUBLE  9  订单折扣  
14  order_account_amount  DECIMAL  /  实际金额  
15  order_time  TIMESTAMP  8000000  下单时间  
16  delivery_channel  INT  80  发货渠道ID  
17  delivery_address  STRING  10000000  发货地址  
18  recipients  STRING  10000000  收件人  
19  contact  STRING  10000000  联系方式  
20  delivery_date  DATE  10000  发货日期  
21  comments  STRING  10000000  备注  
4.2 数据生成 
  我们编写一段程序来解决数据批量生成的问题,为方便有兴趣的童鞋直接使用,所有代码都写在一个Java类中,相关配置(如数据量等)可直接在代码中修改。当然大家也完全可以用自己的数据进行测试。数据生成的代码如下(注:需使用JDK1.8及以上版本编译运行):
package com.cvte.bigdata.pt; 
import java.io.IOException; 
import java.nio.file.Files; 
import java.nio.file.Paths; 
import java.nio.file.StandardOpenOption; 
import java.time.LocalDate; 
import java.time.LocalDateTime; 
import java.time.format.DateTimeFormatter; 
import java.util.UUID; 
import java.util.concurrent.ArrayBlockingQueue; 
public class BatchDataGenerate implements Runnable { 
  // total record
  private final Integer TOTAL_NUM = 1 * 10000 * 10000;
  // batch size of writing to file and echo infos to console
  private final Integer BATCH_SIZE = 10 * 10000;
  // capacity of the array blocking queue
  private final Integer QUEUE_CAPACITY = 100 * 10000;
  // csv file path that save the data
  private final String DEST_FILE_PATH = "E:\\order_detail.csv";
  // the queue which the write thread write into and the read thread read from
  private ArrayBlockingQueue queue = new ArrayBlockingQueue(QUEUE_CAPACITY);
  // the cardinalities of the fields(dimensions) which means the distinct
  // number of a column
  private final int CARDINALITY_SALES_AREA_ID = 100;
  private final int CARDINALITY_SALES_ID = 10000;
  private final int CARDINALITY_ORDER_INPUTER = 100;
  private final int CARDINALITY_PRO_TYPE = 1000;
  private final int CARDINALITY_CURRENCY = 50;
  private final int CARDINALITY_EXCHANGE_RATE = 1000;
  private final int CARDINALITY_UNIT_COST_PRICE = 10000;
  private final int CARDINALITY_UNIT_SELLING_PRICE = 10000;
  private final int CARDINALITY_ORDER_NUM = 1000;
  private final int CARDINALITY_ORDER_DISCOUNT = 9;
  private final int CARDINALITY_ORDER_TIME = 8000000;
  private final int CARDINALITY_DELIVERY_CHANNEL = 80;
  private final int CARDINALITY_DELIVERY_ADDRESS = 10000000;
  private final int CARDINALITY_RECIPIENTS = 10000000;
  private final int CARDINALITY_CONTACT = 10000000;
  private final int CARDINALITY_DELIVERY_DATE = 10000;
  private final int CARDINALITY_COMMENTS = 10000000;
  @Override
  public void run() {
  try {
  if ("tGenerate".equals(Thread.currentThread().getName())) {
  // data generating thread
  generateData();
  } else if ("tWrite".equals(Thread.currentThread().getName())) {
  // data writing thread
  saveDataToFile();
  }
  } catch (InterruptedException e) {
  e.printStackTrace();
  } catch (IOException e) {
  e.printStackTrace();
  }
  }
  /**
  * @Description: generating data for table order_details and called by thread tGenerate
  * @param @throws InterruptedException
  * @return void
  * @throws
  * @Author liyinwei
  * @date 2017年3月9日 下午7:19:16
  */
  private void generateData() throws InterruptedException {
  for (int i = 0; i  20 order by sum_amount desc limit 100  
select delivery_date, avg(exchange_rate) from pt.order_detail group by delivery_date having avg(exchange_rate) > 100 limit 100 
– 顺序查询  
select * from pt.order_detail where order_discount  ‘2018-01-01’ limit 10  
select recipients, contact from pt.order_detail where order_num 
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册!

本版积分规则

快速回复 返回顶部 返回列表