文档章节

大数据常用工具类

o
 osc_fmg49rzg
发布于 2019/03/20 11:15
字数 857
阅读 14
收藏 0

精选30+云产品,助力企业轻松上云!>>>

工具类
config.properties
# jbdc配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://hadoop101:3306/database?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
jdbc.user=root
jdbc.password=000000

# Kafka
kafka.broker.list=hadoop101:9092,hadoop102:9092,hadoop103:9092

# Redis配置
redis.host=hadoop101
redis.port=6379

# hive 的数据库名(选配)
hive.database=database
Properties.Util
import java.io.InputStreamReader
import java.util.Properties

object PropertiesUtil {

   def load(propertieName: String): Properties = {
       val prop = new Properties();
       prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName),
           "UTF-8"))
       prop
  }

}
MyJdbcUtil
import com.alibaba.druid.pool.DruidDataSourceFactory
import java.sql.PreparedStatement
import java.util.Properties
import javax.sql.DataSource

object JdbcUtil {

   var dataSource: DataSource = init()

   def init() = {
       val properties = new Properties()
       val prop = PropertiesUtil.load("config.properties")

       properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
       properties.setProperty("url", prop.getProperty("jdbc.url"))
       properties.setProperty("username", prop.getProperty("jdbc.user"))
       properties.setProperty("password", prop.getProperty("jdbc.password"))
       properties.setProperty("maxActive", prop.getProperty("jdbc.datasource.size"))

       DruidDataSourceFactory.createDataSource(properties)

  }

   def executeUpdate(sql: String, params: Array[Any]): Int = { // "insert into xxx values (?,?,?)"
       var rtn = 0
       var pstmt: PreparedStatement = null
       val connection = dataSource.getConnection
       try {
           connection.setAutoCommit(false)
           pstmt = connection.prepareStatement(sql)

           if (params != null && params.length > 0) {
               for (i <- 0 until params.length) {
                   pstmt.setObject(i + 1, params(i))
              }
          }
           rtn = pstmt.executeUpdate()
           connection.commit()
      } catch {
           case e: Exception => e.printStackTrace
      }
       rtn
  }

   def executeBatchUpdate(sql: String, paramsList: Iterable[Array[Any]]): Array[Int] = {
       var rtn: Array[Int] = null
       var pstmt: PreparedStatement = null
       val connection = dataSource.getConnection
       try {
           connection.setAutoCommit(false)
           pstmt = connection.prepareStatement(sql)
           for (params <- paramsList) {
               if (params != null && params.length > 0) {
                   for (i <- 0 until params.length) {
                       pstmt.setObject(i + 1, params(i))
                  }
                   pstmt.addBatch()
              }
          }
           rtn = pstmt.executeBatch()
           connection.commit()
      } catch {
           case e: Exception => e.printStackTrace
      }
       rtn
  }

   // 测试
   def main(args: Array[String]): Unit = {
//       JdbcUtil.executeUpdate("insert into table_1 values(?,?,?,?,?)", Array("take100", "100", 100, 200,300))
       JdbcUtil.executeBatchUpdate("insert into table_1 values(?,?,?,?,?)",List(Array("take101", "100", 200, 200,200),Array("take102", "100", 300, 300,300)))
  }
}
MyRedisUtil
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

object MyRedisUtil {

   var jedisPool: JedisPool = null

   def getJedisClient: Jedis = {
       if (jedisPool == null) {
           println("开辟一个连接池")
           val prop = PropertiesUtil.load("config.properties")
           val host = prop.getProperty("redis.host")
           val port = prop.getProperty("redis.port").toInt

           val jedisPoolConfig = new JedisPoolConfig()
           jedisPoolConfig.setMaxTotal(100)  //最大连接数
           jedisPoolConfig.setMaxIdle(20)   //最大空闲
           jedisPoolConfig.setMinIdle(20)     //最小空闲
           jedisPoolConfig.setBlockWhenExhausted(true)  //忙碌时是否等待
           jedisPoolConfig.setMaxWaitMillis(500)//忙碌时等待时长 毫秒
           jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试

           jedisPool = new JedisPool(jedisPoolConfig, host, port)
      }
       println(s"jedisPool.getNumActive = ${jedisPool.getNumActive}")
       println("获得一个连接")
       jedisPool.getResource
  }
   
}
MyKafkaUitl
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object MyKafkaUtil {

   private val properties: Properties = PropertiesUtil.load("config.properties")
   val broker_list = properties.getProperty("kafka.broker.list")

   // kafka消费者配置
   val kafkaParam = Map(
       "bootstrap.servers" -> broker_list,//用于初始化链接到集群的地址
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       //用于标识这个消费者属于哪个消费团体
       "group.id" -> "gmall_consumer_group",
       //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
       //可以使用这个配置,latest自动重置偏移量为最新的偏移量
       "auto.offset.reset" -> "latest",
       //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
       //如果是false,会需要手动维护kafka偏移量
       "enable.auto.commit" -> (true: java.lang.Boolean)
  )

   // 创建DStream,返回接收到的输入数据
   // LocationStrategies:根据给定的主题和集群地址创建consumer
   // LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
   // ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
   // ConsumerStrategies.Subscribe:订阅一系列主题
   def getKafkaStream(topic: String, ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
       val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
       dStream
  }
}
MyEsUtil
import io.searchbox.client.config.HttpClientConfig
import io.searchbox.client.{JestClient, JestClientFactory}
import io.searchbox.core.{Bulk, BulkResult, Index}
import java.util.Objects

object MyEsUtil {
   private val ES_HOST = "http://hadoop101"
   private val ES_HTTP_PORT = 9200
   private var factory: JestClientFactory = null

   /**
     * 获取客户端
     *
     * @return jestclient
     */
   def getClient: JestClient = {
       if (factory == null) build()
       factory.getObject
  }

   /**
     * 关闭客户端
     */
   def close(client: JestClient): Unit = {
       if (!Objects.isNull(client)) try
           client.shutdownClient()
       catch {
           case e: Exception =>
               e.printStackTrace()
      }
  }

   /**
     * 建立连接
     */
   private def build(): Unit = {
       factory = new JestClientFactory
       factory.setHttpClientConfig(new HttpClientConfig.Builder(ES_HOST + ":" + ES_HTTP_PORT).multiThreaded(true)
          .maxTotalConnection(20) //连接总数
          .connTimeout(10000).readTimeout(10000).build)

  }

   // 批量插入
   def insertBulk(indexName: String, docList: List[Any]): Unit = {
       val jest: JestClient = getClient
       val bulkBuilder = new Bulk.Builder
       bulkBuilder.defaultIndex(indexName).defaultType("_ex")
       println(docList.mkString("\n"))
       for (doc <- docList) {

           val index: Index = new Index.Builder(doc).build()
           bulkBuilder.addAction(index)
      }
       val result: BulkResult = jest.execute(bulkBuilder.build())
       println(s"保存es= ${result.getItems.size()} 条")
       close(jest)
  }

   // 测试
   def main(args: Array[String]): Unit = {
       val jest: JestClient = getClient
       val doc = "{\n \"name\":\"yiyi\",\n \"age\": 17\n}"
       val index: Index = new Index.Builder(doc).index("myesutil_test").`type`("_doc").build()
       jest.execute(index)
  }

}
 
---------------------
原文:https://blog.csdn.net/qq_31108141/article/details/88367058

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
【全集】大数据Java基础

课程介绍 本课程是由猎豹移动大数据架构师,根据Java在公司大数据开发中的实际应用,精心设计和打磨的大数据必备Java课程。通过本课程的学习大数据新手能够少走弯路,以较短的时间系统掌握大...

osc_gu0nti2n
2019/10/13
2
0
大数据高端班划重点 hadoop常用四大模块文件

1.core-site.xml(工具模块)。包括Hadoop常用的工具类,由原来的Hadoopcore部分更名… 1.core-site.xml(工具模块)。包括Hadoop常用的工具类,由原来的Hadoopcore部分更名而来。主要包括系统配...

金豆8
2019/06/12
0
0
基本类型包装、拆装箱 pm:Math、arrays类 大数据运算

基本类型包装 就是转换用 8个类型 就是8个方法 除了int(Integer) 和char(Character)之外,其他6个都是首字母大写 字符串--->基本类型 得出结论 pare 基本数据类型 (String s) 基本数据...

osc_r590b6ja
2019/05/27
2
0
我的知识栈:目录(更新中...)

目录 Linux 网络通信 软件工程 云计算与大数据 爬虫,python 前端,微信小程序(HTML/CSS/JS) 面向对象设计(Java) 存储与服务器 项目总结 其他 面向对象设计模式 面向对象设计模式总结 23种...

osc_l3xti0dr
2019/06/12
2
0
0基础学习大数据你需要了解的学习路线和方向

现在大数据这么火,各行各业想转行大数据,那么问题来了,该往哪方面发展,哪方面最适合自己? 首先从字面来了解一下大数据 大数据 (巨量数据集合(IT行业术语)) 大数据(big data),指无...

琳达老师
2018/06/11
0
0

没有更多内容

加载失败,请刷新页面

加载更多

linux下java环境搭建

1、jdk下载: 官方地址:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html 如下图所示,我这边选择的是红框中的版本 2、压缩包上传至服务器 将下载的压缩包上传...

wc_飞豆
46分钟前
17
0
面试题:Java对象不再使用时,为什么要赋值为null?

前言 许多Java开发者都曾听说过“不使用的对象应手动赋值为null“这句话,而且好多开发者一直信奉着这句话;问其原因,大都是回答“有利于GC更早回收内存,减少内存占用”,但再往深入问就回...

码农突围
48分钟前
22
0
设计模式(5) 原型模式

原型模式 原型模式的适用场景 浅拷贝 深拷贝 用Initialize方法修改初始化状态 原型模式与之前学习的各种工厂方法、单例模式、建造者模式最大、最直观的区别在于,它是从一个既有的对象“克隆...

zhixin9001
48分钟前
7
0
获取免费的pycharm激活码网站

http://www.lookdiv.com/

云烟成雨forever
48分钟前
27
0
用Helm部署Kubernetes应用,支持多环境部署与版本回滚

1 前言 Helm是优秀的基于Kubernetes的包管理器。利用Helm,可以快速安装常用的Kubernetes应用,可以针对同一个应用快速部署多套环境,还可以实现运维人员与开发人员的职责分离。现在让我们安...

南瓜慢说
49分钟前
25
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部