亚洲国产中文精品久久电影欧美_日韩午夜a级毛片免费观看_中文无码在线电影_午夜激情影院操一操_最近中文字幕免费版在线_亚洲国产精品无码久久网速快_在线观看黄片国产_亚洲AⅤ永久无码精品_国产三级网站免费观看_亚洲人与动人物A级毛片免费视频

第十周,大數(shù)據(jù)建模學(xué)習(xí)應(yīng)用于模具行業(yè)的培訓(xùn)班

2019-11-03 16:09:50

早上, 很早到書城, 今天最后一天培訓(xùn), 還是在  SPARK 復(fù)習(xí)。


        開發(fā) SPARK 使用  scala 、java 、 python 、 api 以及    shell 語言開發(fā)的  搜索引擎。


       缺點:    不適合  web服務(wù)  ,   dao層    、web爬蟲 


       優(yōu)點:    伯克利數(shù)據(jù)分析 生態(tài)圈  , 機器學(xué)習(xí) , 數(shù)據(jù)挖掘, 數(shù)據(jù)庫, 信息檢索, 自然語言處理, 語音識別

                      以  spark core 為核心 , 從  hdfs  , amazon  s3  hbase 等持久讀取數(shù)據(jù)


 image.png


SPARK SQL  基于內(nèi)存的, 



image.png

查看數(shù)據(jù)庫

深圳塑膠模具廠,深圳市模具廠,深圳模具廠,深圳模具,深圳塑膠模具

image.png

image.png

image.png

image.png


image.png



這個在前端也能看到,點擊進去, 詳細(xì)說明。


image.png

       image.png

image.png

image.png



image.png

image.png

image.pngimage.png


HIVE 不支持事物的, 增刪改查。 ACRD 使用 


image.png



在那一臺機器完成的, 可以看看

image.png


提交的人物, 由系統(tǒng)分配節(jié)點去跑。


image.png

image.png



image.png





HIVE 主要做 數(shù)據(jù)倉庫  DW   , 

image.png



插入數(shù)據(jù)完成, 做一下  查詢數(shù)據(jù)

image.png



將  HIVE 的內(nèi)核, 改成  SPARK 以后, 提升速度 80倍

image.png



image.png



image.png




上面是   HOVE 的插入和查詢的過程。


接著跑  SPARK SQL 的過程


image.png


image.png


所有的  數(shù)據(jù)庫和表, 都是存儲在  HDFS 上的。


現(xiàn)在要用  SPARK SQL 來執(zhí)行。


image.png


image.png


image.png


show database  

use  

select 

image.png


image.png




這個是 SPARK的管理頁面


image.png



image.png

image.png


image.png

image.png




image.png

image.png

image.png

image.png

image.png

image.png

image.png


image.png

image.png

image.png


image.png


DAG 的有向性的結(jié)合圖



DAG Scheduler 





image.png


RDD 經(jīng)過 4個 聚合步驟后, 形成一個  DAG , 多個 DAG 組合編程  DAG計劃表


下面是  scala 寫的


image.png



要統(tǒng)計下面文件中的個數(shù)


image.png


image.png

上述雖然是  一行代碼, 但是也顯示的淋漓盡致


image.png


image.png




運行的狀況


image.png



image.png


image.png




image.png


image.png


image.png



管理頁面的結(jié)果image.png


image.png


環(huán)境配置image.png


image.png



image.png


image.png


SPARK   單詞統(tǒng)計的 DEMO  , 接下來再看看


image.png

image.png


image.png



SPARK STREM 消費kaFka 的數(shù)據(jù),  管理者確定是   ZooKeeper


image.png



image.png



image.png


下午講解     Klin       做一些報表的功能, SPARK 




image.png

要搭建不同的集群, 由原來的 store 的程序, 遷移到  spark 上來,    Flink 是把交互式查詢和實時計算合在一起了。

Flink 也是用 Scala 來寫。

image.png


1、客戶端提交作業(yè)后,啟動Driver,Driver是Spark作業(yè)的Master(也就是通過Driver來啟動Receiver,定時去啟動任務(wù)的處理,

注意的是,驅(qū)動啟動任務(wù)會受前一個任務(wù)執(zhí)行的影響。也就是前一個任務(wù)沒有執(zhí)行完成后,是不會啟動后邊的任務(wù)的。 

所以,注意你的streaming的執(zhí)行時間,絕對不要超過Recive數(shù)據(jù)的時間)


2、每個作業(yè)包含多個Executor,每個Executor以線程的方式運行task,Spark Streaming至少包含一個Receiver task。

(一個Executor就是一個spark進程,在yarn中就是一個container,這個大家應(yīng)該知道。

然后Receiver task是在driver中創(chuàng)建的,我理解一個Receiver是運行在一個Executor中的。

然后如果想要創(chuàng)建多個Receiver,那么需要大概這樣做(1 to 10).map(_.createStream…),這樣就能創(chuàng)建10個receiver task啦。 

注意這個數(shù)量當(dāng)然不能超過你的結(jié)點數(shù)量啦。 

還有個問題,通常使用kafka比較合適,因為kafka是stream向kafka來poll數(shù)據(jù)。

而他媽的flume默認(rèn)只支持pull,如果想支持poll,那需要定制sink,那真是太惡心了。)


3、Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報給Driver,然后備份到另外一個Executor上。

(默認(rèn)情況下接受數(shù)據(jù)是200毫秒生成一個block,我理解一個block應(yīng)該是一個partition?

這個還不確定,需要對照源代碼看一下;

然后會把生成的Block隨機扔到不同的Executor,同時,driver去派發(fā)任務(wù)時,也會找到就近的Executor。

我理解,節(jié)點中的所有executor都應(yīng)該會有數(shù)據(jù)才對)


4、ReceiverTracker維護Receiver匯報的BlockId。

    (這個ReceiverTracker應(yīng)該是維護在Driver中,Driver會根據(jù)維護的這些數(shù)據(jù)塊進行任務(wù)的派發(fā))


5、Driver定時生成JobGenerator,根據(jù)DStream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler。


6、JobScheduler負(fù)責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,

      每個stage包含一到多個task。(我記得DAGScheduler會對任務(wù)做一層優(yōu)化)


7、TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護task的運行狀態(tài)。


8、當(dāng)tasks,stages,jobset完成后,單個batch(批處理)才算完成。


image.png

具體流程:

  1. 客戶端提交作業(yè)后啟動Driver,Driver是spark作業(yè)的Master

  2. 每個作業(yè)包含多個Executor,每個Executor以線程的方式運行task,Spark Streaming至少包含一個receiver task。

  3. Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報給Driver,然后備份到另外一個Executor上。

  4. ReceiverTracker維護Reciver匯報的BlockId。

  5. Driver定時啟動JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler。

  6. JobScheduler負(fù)責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個stage包含一到多個task。

  7. TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護task的運行狀態(tài)。

  8. 當(dāng)tasks,stages,jobset完成后,單個batch才算完成。



image.png



---------------------------------------------------   第十周  盧老師 講課筆記


Spark Streaming原理:


Spark Streaming是基于spark的流式批處理引擎,其基本原理是把輸入的數(shù)據(jù)以某一時間間隔批量的處理,當(dāng)批處理縮短到秒級時,便可以用于處理實時數(shù)據(jù)流。


kafka


flume                                        HDFS


hdfs      -------> spark streaming --------> DataBase


zeromq                   |                   Dashboards

                               | 

twitter                    |

                              |

input data streaming -->sparkstreaming-->batches of input data--->spark engine --->batches of processed data




live input stream --> spark Streaming --->t t t --->spark job --->results



sparkstreaming架構(gòu)


spark streaming作業(yè)流程:


Driver:

receivertracker

jobGenerator

JobScheduler

DagScheduler

TaskScheduler


BlockManagerMaster


Executor:

Receiver

BlockManagerSlave


Executor:

Task

BlockManagerSlave



運行機制:

1、客戶端提交作業(yè)后啟動一個Driver,Driver是spark作業(yè)的Master。


2、每個作業(yè)包含多個Executor,每個Executor以線程的方式運行Task,Spark Streaing至少包含一個receiver task。


3、Receiver接收數(shù)據(jù)后生成Block,并把Blockid匯報給Driver,然后備份到另外一個Executor上


4、ReceiverTracker維護Recivver匯報的BlockId。


5、Driver定時啟動JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建JobSet,交給JobScheduler.


6、JobScheduler負(fù)責(zé)調(diào)度JobSet,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個Stages包含一個到多個Task。


7、TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護task的運行狀態(tài)。


8、當(dāng)tasks、stages、jobset完成后,單個batch才算完成。





Spark Streaming消費Kafka數(shù)據(jù):


import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe


val kafkaParams = Map[String, Object](

  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",

  "key.deserializer" -> classOf[StringDeserializer],

  "value.deserializer" -> classOf[StringDeserializer],

  "group.id" -> "use_a_separate_group_id_for_each_stream",

  "auto.offset.reset" -> "latest",

  "enable.auto.commit" -> (false: java.lang.Boolean)

)


val topics = Array("topicA", "topicB")

val stream = KafkaUtils.createDirectStream[String, String](

  streamingContext,

  PreferConsistent,

  Subscribe[String, String](topics, kafkaParams)

)


stream.map(record => (record.key, record.value))





Spark統(tǒng)計單詞次數(shù):


text_file = sc.textFile("hdfs://...")

counts = text_file.flatMap(lambda line: line.split(" ")) \

             .map(lambda word: (word, 1)) \

             .reduceByKey(lambda a, b: a + b)

counts.saveAsTextFile("hdfs://...")




SparkSQL數(shù)據(jù)查詢:


// Creates a DataFrame based on a table named "people"

// stored in a MySQL database.

val url =

  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"

val df = sqlContext

  .read

  .format("jdbc")

  .option("url", url)

  .option("dbtable", "people")

  .load()


// Looks the schema of this DataFrame.

df.printSchema()


// Counts people by age

val countsByAge = df.groupBy("age").count()

countsByAge.show()


// Saves countsByAge to S3 in the JSON format.

countsByAge.write.format("json").save("s3a://...")




Spark機器學(xué)習(xí)實現(xiàn)邏輯回歸算法:


// Every record of this DataFrame contains the label and

// features represented by a vector.

val df = sqlContext.createDataFrame(data).toDF("label", "features")


// Set parameters for the algorithm.

// Here, we limit the number of iterations to 10.

val lr = new LogisticRegression().setMaxIter(10)


// Fit the model to the data.

val model = lr.fit(df)


// Inspect the model: get the feature weights.

val weights = model.weights


// Given a dataset, predict each point's label, and show the results.

model.transform(df).show()









-------------------------------------------------------------------------------------------------------


最后做一次復(fù)習(xí) ,  有始有終。


image.png


image.png

-------在最下面增加 2行代碼。


image.png

如果有 幾千臺服務(wù)器, 必須配置免登錄,  生成的密鑰, 追加到這個文件中去。


image.png


image.png

image.png


hadoop 的官方網(wǎng)站, 有非常詳細(xì)的內(nèi)容。


image.png


image.png


多種方式都可以查看是不是啟動成功


image.png


image.png


image.png


image.png

50070端口, HDFS的端口


搭建完成后,     


image.png



下面是  Yarn 的配置信息

image.png


需要 shuffle  的, 需要增加 各類  shuffle .


FIFO     FAIR    先進先出    公平調(diào)度  


上面是 Yarn 的配置文件, 這是第三周講的


第四周講   Hbase , 是 google  的大表實現(xiàn)

image.png

image.png

image.png

image.png

image.png

image.png


image.png


image.png


image.png


image.png


image.png



image.png

image.png


image.png


image.png


image.pngimage.png

image.png


image.png


單機版和集群版的不同

image.png


image.png

image.png


image.png

image.png


image.png

image.png

image.png

image.png



image.pngimage.png


image.png


第5周 、 第六周 


image.png


image.png

image.png


image.png

image.png

一切階是對像


image.png

image.png


image.png

image.png

image.png



image.png


image.png


以上是   5周   Scala  



image.png




image.png


image.png


image.png



image.png



image.png


image.png


image.png



image.png

image.png


image.png


image.png




















首頁
產(chǎn)品
新聞
聯(lián)系