新用戶登錄后自動(dòng)創(chuàng)建賬號(hào)
登錄大數(shù)據(jù)的批處理和實(shí)時(shí)處理模式已經(jīng)存在了。但是,還沒有一種模式可以允許我們實(shí)時(shí)批處理非獨(dú)立數(shù)據(jù)。合作伙伴一旦收到操作指導(dǎo),Expedia的營(yíng)銷團(tuán)隊(duì)需要分析相互依賴的數(shù)據(jù)集?,F(xiàn)存的系統(tǒng)運(yùn)行于一個(gè)本地Hadoop集群中,但是整個(gè)團(tuán)隊(duì)一直在努力達(dá)到內(nèi)部的SLA(服務(wù)等級(jí)協(xié)議)。這些信息也是有時(shí)效的,更快地獲取數(shù)據(jù)意味著給合作伙伴更好的操作指導(dǎo)。
從事于Expedia研究的Pariveda小組參與AWS的Solutions Architects(解決方案架構(gòu))研究來應(yīng)對(duì)三個(gè)明顯的挑戰(zhàn):如何在源數(shù)據(jù)可用后盡快傳遞分析結(jié)果;如何處理相互依賴但又在不同時(shí)間產(chǎn)生的數(shù)據(jù)集;如何管理不同時(shí)間達(dá)到的數(shù)據(jù)集之間的從屬關(guān)系。
在本文中,我將會(huì)描述Expedia,Pariveda和AWS團(tuán)隊(duì)是如何以 AWS Lambda,Amazon DynamoDB,Amazon EMR和Amazon S3為組成部分,找出獨(dú)特方法來實(shí)時(shí)處理數(shù)據(jù)的。你將會(huì)學(xué)到在不管理任何基礎(chǔ)設(shè)施的情況下如何實(shí)施一個(gè)相似的傳遞途徑。
解除數(shù)據(jù)集間的相互依賴關(guān)系
我們需要的解決的一個(gè)問題是數(shù)據(jù)集間的相互依賴關(guān)系。我們的目標(biāo)是為另一個(gè)系統(tǒng)提供統(tǒng)一,清潔的數(shù)據(jù)輸入源以便進(jìn)行更加詳細(xì)的分析。為了創(chuàng)建這些數(shù)據(jù)輸入源,每天來自多個(gè)合作伙伴和內(nèi)部系統(tǒng)的不同種類的成百上千的數(shù)據(jù)集必須經(jīng)過接收,聚合和查詢。每一種數(shù)據(jù),每一個(gè)合作伙伴的數(shù)據(jù)達(dá)到的時(shí)間都是不同的。這意味著數(shù)據(jù)處理過程需要持續(xù),直到一個(gè)數(shù)據(jù)輸入源所需的所有數(shù)據(jù)都到達(dá)。
下面概括的解決方案就是我們的成果。我們使用從屬于一個(gè)S3桶的AWS Lambda來更新DynamoDB中定義的任務(wù)。任務(wù)定義包括名稱,非獨(dú)立文件的列表,它們的狀態(tài)(已到達(dá)還是未到達(dá))和在EMR中運(yùn)行該任務(wù)所需的參數(shù)。一旦一個(gè)特定任務(wù)所需的所有文件都到達(dá)了,lambda函數(shù)就會(huì)更新任務(wù)隊(duì)列,在EMR中啟動(dòng)一個(gè)集群。EMR將結(jié)果再推回到S3以便使用S3的應(yīng)用程序在需要該結(jié)果時(shí)可以提取到它們。
配置任務(wù)
系統(tǒng)的核心時(shí)任務(wù)。任務(wù)對(duì)象保存了所有的信息,這些信息是確定數(shù)據(jù)依賴關(guān)系,依賴關(guān)系狀態(tài)和待發(fā)生的處理結(jié)果所必需的。通過定義任務(wù),你可以配置所有需要發(fā)生的work。從任務(wù)表中,可以輕松地看到所有的數(shù)據(jù)依賴關(guān)系。
建立S3事件與任務(wù)之間的映射
當(dāng)我們從S3獲取事件時(shí),AWS Lambda中引發(fā)的事件只有關(guān)于S3中被修改的對(duì)象的上下文。從S3中直接獲取的數(shù)據(jù)沒有關(guān)于這個(gè)任務(wù)的任何信息。但在Node.js這條只給出了一行代碼的信息中,我們確實(shí)得到了一條很珍貴的信息,那就是S3對(duì)象的新密鑰。
var srcKey = event.Records[0].s3.object.key;
從這個(gè)新密鑰,我們需要一種方式來獲取任務(wù)信息。為了達(dá)到這個(gè)目的,我們創(chuàng)建了FileUnit表。這一表實(shí)際上完全改變了這個(gè)任務(wù),將S3密鑰作為打開表的范圍密鑰,而任務(wù)密鑰作為數(shù)據(jù)負(fù)載。這使我們獲取了源密鑰,通過一次DynamoDB查詢就可以算出我們擁有的任務(wù)。
從這里,我們可以更新任務(wù),確定是否所有的依賴數(shù)據(jù)都已經(jīng)到達(dá),并啟動(dòng)亞馬遜EMR。
產(chǎn)生的流程圖
創(chuàng)建DynamoDB表
我們?cè)贒ynamoDB中創(chuàng)建如下三個(gè)表:
Task表
針對(duì)Task表,我們使用 HashKey/RangeKey Primary Key配置,以日期作為hash密鑰,以TaskKey字符串作為范圍密鑰。這個(gè)表可以使用任何名稱,只要在你所有的任務(wù)中,該名稱是唯一的即可。而針對(duì)Expedia項(xiàng)目,從hash密鑰的角度看,Date并不遵守時(shí)間系列數(shù)據(jù)的準(zhǔn)則,所以你可以創(chuàng)建另一個(gè)可預(yù)測(cè)的hash密鑰。但是如果你的任務(wù)是以天為基礎(chǔ)重復(fù)的,那么Date是一個(gè)很好的hash密鑰的選擇,因?yàn)樗阌诤竺娴乃阉鳌?/p>
這里只是一個(gè)樣例條目:
在創(chuàng)建表格時(shí),我們只需關(guān)心TaskKey和Date參數(shù)。但是輸入文件(要注意這些文件在S3上的路徑)和ScriptParameters對(duì)整個(gè)系統(tǒng)的運(yùn)行是必需的。該任務(wù)在控制臺(tái)創(chuàng)建。在實(shí)際操作中,這些配置信息應(yīng)該在數(shù)據(jù)文件被加載前以設(shè)定的頻率從某個(gè)文件中加載。
FileUnit表
FileUnit是對(duì)使用S3路徑的Task表格的一個(gè)引用。它有三個(gè)屬性:
Date(作為hash密鑰)
Filename(作為范圍密鑰)–指明Filename文件在S3上的路徑
Task–待引用的任務(wù)
在所有的實(shí)踐中,Date并不是FileUnit表格的必須屬性。實(shí)際上,如果你能避免這一屬性而且不影響任務(wù)的運(yùn)行,那會(huì)是更好的選擇,但是這一屬性卻能更好地支持我們的描述。如果你的任務(wù)名并不是以天為基礎(chǔ)重復(fù),使用S3路徑作為hash密鑰,使用任務(wù)名稱作為范圍密鑰會(huì)是更好的選擇。這可以使你在根據(jù)hash密鑰查詢的同時(shí),管理那些在多個(gè)任務(wù)之間相互依賴的數(shù)據(jù)集變得更簡(jiǎn)單。
Batch表
應(yīng)該創(chuàng)建Batch表格,并以Date屬性作為hash密鑰,以Task屬性為范圍密鑰。Task的值將會(huì)與Task表格中的TaskKey的值相同。為了便于查詢,我們也為Batch表格添加了全局備用索引,并以日期為hash密鑰,以ProcessingState為范圍密鑰。這幫助我們輕松地查詢未處理的項(xiàng)目。
測(cè)試數(shù)據(jù)
為了測(cè)試,在Task表格中創(chuàng)建一個(gè)與上面類似的條目。確保使用屬性名稱所指定的輸入路徑,將它們的值設(shè)為NULL。下一步,獲取這些輸入路徑,在FileUnit表格中創(chuàng)建條目。路徑名稱必須完全匹配Filename列的值(包括大小寫)。FileUnit表中Task的值必須匹配任務(wù)的TaskKey值。要使用上面的Task樣例,你將創(chuàng)建如下三個(gè)FileUnit條目:
表創(chuàng)建完畢,測(cè)試數(shù)據(jù)加載完畢后,我們可以實(shí)施AWS Lambda函數(shù)。
寫AWS Lambda函數(shù)
代碼框架
代碼框架與上面展示的流程圖很相似。
我們將會(huì)粉飾代碼,將已完成的項(xiàng)目添加到Batch表中因?yàn)樵摯a與上面的updateTask函數(shù)非常相似。但我們確實(shí)是使用putItem函數(shù)而非updateItem函數(shù)。
啟動(dòng)EMR
在EMR中啟動(dòng)任務(wù)是很簡(jiǎn)單的。我們啟動(dòng)一個(gè)EMR集群,然后添加一個(gè)工作流程步驟。有很多的配置代碼,但是實(shí)質(zhì)是簡(jiǎn)單的。你必須安裝合適的應(yīng)用來完成你的工作,在本案例中,你必須安裝Hive,所以你在啟動(dòng)EMR時(shí)會(huì)看到工作流程步驟被添加到EMR中。
從這里,我們通過使用腳本參數(shù)調(diào)用addJobFlowSteps函數(shù),將處理任務(wù)添加到工作流程中。有一個(gè)很小的轉(zhuǎn)換步驟需要在這里進(jìn)行。你可以在GitHub知識(shí)庫(kù)中找到該轉(zhuǎn)換代碼。
部署AWS Lambda函數(shù)
為了在AWS Lambda函數(shù)中部署該應(yīng)用,你需要:
1.從GitHub下載源代碼。
2.使用 npminstall工具來安 裝async所依賴的部件。
3.在FunctionConstants.js文件中更新logsPath的值,使其指向某一個(gè)桶,并在你希望EMR放置日志文件的路徑前加前綴。
4.將函數(shù)打包,并部署該函數(shù),如本例或演練步驟中所示。
5.確保你的Lambda Execution IAM角色有以下權(quán)限:
a.在DynamoDB中–調(diào)用getItem,updateItem和putItem函數(shù)的權(quán)限
b.在EMR中–調(diào)用startJobFlow和addJobFlowItem函數(shù)的權(quán)限
在控制臺(tái)上進(jìn)行快速測(cè)試
確保已正確配置了所有參數(shù),你可以在AWS控制臺(tái)上打開AWS Lambda函數(shù)的Edit/Test頁(yè)面,模擬添加到S3的新文件:
使用S3樣本事件,修改s3部分和object部分的參數(shù)值,觸發(fā)模擬任務(wù)中文件的事件。
在Execution結(jié)果窗口你應(yīng)該可以看到消息Files processed successfully(消息處理成功)。
將S3桶事件發(fā)布到AWS Lambda函數(shù)
從控制臺(tái)中,從Actions菜單中選擇Add event source選項(xiàng),將S3桶中的Object Created事件添加到AWS Lambda函數(shù)中,將關(guān)于事件源的信息設(shè)置為S3桶:
端對(duì)端測(cè)試
既然一切都已準(zhǔn)備就緒,你可以在S3桶中創(chuàng)建新文件了。你應(yīng)該看到與如下截屏所類似的信息:
如果EMR集群還未啟動(dòng),你可以查看該功能所產(chǎn)生的CloudWatch日志,確定問題所在。你也可以通過控制臺(tái)模擬所有的文件達(dá)到情況來確定功能收到錯(cuò)誤信息的部位。
祝賀你,你已經(jīng)建立了一個(gè)工作系統(tǒng)!
優(yōu)化
有若干方法可以優(yōu)化該系統(tǒng),取決于你的使用場(chǎng)景。下面是一些優(yōu)化案例。
一個(gè)文件對(duì)多個(gè)任務(wù)
如果多個(gè)任務(wù)以來同一個(gè)文件,你必須適當(dāng)?shù)卣{(diào)整FileUnit表,然后調(diào)整任務(wù)查詢相關(guān)參數(shù)來處理結(jié)構(gòu)的變化。你可以使用上面描述的格式(文件名作hash密鑰)或者你可以保留格式但是將Task表中條目的值設(shè)為一組值而非一個(gè)值。
清掃任務(wù)
如果你的任務(wù)很小,你預(yù)期數(shù)據(jù)以某一頻率到達(dá),你可以調(diào)整批處理的任務(wù)大小到大于1 (該值是在配置文件中配置的)。如果你這樣設(shè)置了,你可能想要添加一個(gè)清掃功能,使其以定時(shí)器規(guī)定的時(shí)間來清除可能還未運(yùn)行的任務(wù)。以這種方式,你獲取了效率,待處理的任務(wù)在運(yùn)行前不用等待太長(zhǎng)時(shí)間來滿足批處理的個(gè)數(shù)限制
找回密碼
注冊(cè)賬號(hào)