Post on 21-Jul-2020
transcript
MapReduce: teoria e implementazione
1
Genesi
• nasce nel 2004 ❒ [“MapReduce: simplified Data Processing on
Large Clusters”, Dean, Ghemawat. Google Inc.] • nasce dall’esigenza di eseguire problemi
“semplici” su big data (>1TB) - e.g., ❒ grep ❒ conteggio delle frequenze di accesso a url ❒ inverted index
2
Fino al 2004 • Large input data => soluzione di Google:
<<abbiamo tante risorse computazionali. Usiamole! Distribuiamo la computazione!>> ❒ Riformulare il programma per calcolo parallelo ❒ distribuire dati ❒ gestire guasti, etc.
• => il problema non è più semplice! • Per di più: occorre riformulare ogni singolo
problema (grep, inverted index, etc.) 3
Motivazioni in breve
① Voglio processare grandi moli di dati ② Voglio usare molte (centinaia/
migliaia) di CPU ③ Voglio che tutto questo sia
SEMPLICE
4
Nel 2004 • Dean e Ghemawat propongono MapReduce, un nuovo
modello di programmazione distribuita. • Risolve problemi di analisi di grandi insiemi di dati
(stesso insieme di istruzioni su tanti record diversi). • Si chiede al programmatore di convertire il suo
codice in due funzioni: ❒ map function ❒ reduce function
I programmi scritti secondo questo modello sono intrinsecamente parallelizzabili
5
Map & Reduce
value map key value key value key value key value key value key value key value key
…...
(key1, val)
(key8, val)
(key1, val)
(key8, val)
(key5, val)
(key1, val)
map
map
reduce
reduce
reduce
…... …...
value key value key value key value key value key value key value key value key
input data set intermediate
pairs output
data set sh
uffle
6
Come può esserci utile?
• Un esempio classico: WordCount
vorrei contare le occorrenze di ogni parola all’interno di una collezione di documenti
7
Map & Reduce
map
most poetry ignores most people
(most,1) (people,1) (ignore,1)
map
reduce
reduce
1 ignore 1 ignores
4most 2people 2poetry
input data set intermediate
pairs output
data set
most people ignore most poetry
(most,1) (poetry,1)
(most,1) (poetry,1) (ignores,1) (most,1) (people,1)
shuf
fle
phas
e
doc1
.txt
do
c2.t
xt
8
pseudo-codice //key:document name. value:document contents !map(String key, String value){!
!for each word w in value!! !EmitIntermediate(w, "1”)!
}!// key: a word. values: a list of counts!reduce(String key, Iterator values){!
!int result = 0;! !for each v in values{! !result += ParseInt(v);! ! !Emit(AsString(result));!
!}!}!!
9
Cosa c’è di bello? • Abbiamo scomposto il conteggio delle parole in
sotto-programmi che possono essere associate a diversi task. ❒ ogni map task lavora su un sottoinsieme dell’input ❒ ogni reduce task lavora su una chiave (merge) ❒ i reduce usano l’out dei map => R dopo M
• A parte questo, l’esecuzione può procedere in parallelo!
I programmi scritti secondo il modello MapReduce sono intrinsecamente parallelizzabili
10
Parallizzazione: map phase
macchina 1
macchina 2 macchina 3
macchina 4
doc1.txt! doc2.txt!
doc3.txt!
M M
M coordinatore
11
Parallizzazione: shuffle phase
macchina 1
macchina 2 macchina 3
macchina 4
doc1.txt! doc2.txt!
doc3.txt!
M
M M
coordinatore
R
R
12
Parallizzazione: reduce phase
macchina 1
macchina 2 macchina 3
macchina 4
doc1.txt! doc2.txt!
doc3.txt!
R
coordinatore
R most,4!people,2!ignore,1!
poetry,2!ignores,1!
13
Cos’altro si può fare?
Distributed grep: voglio filtrare le linee di un documento (molto grande) in cui appare una parola X. 1. Il documento è spezzettato in chunk. 2. map: filtra le linee del suo chunk che
contengono X 3. reduce: funzione identità
14
Cos’altro si può fare?
Frequenza di accesso a un URL: dato un log con l’elenco di URL visitati, voglio contare le frequenze di accesso per ogni URL. 1. Il documento è spezzettato in chunk. 2. map: analizza il suo chunk e per ogni URL
emette una coppia <URL, 1>!3. reduce: somma tutti i valori associati allo
stesso URL ed emette una coppia <URL,totalcount>!
15
Cos’altro si può fare? Inverted index: dato un elenco di documenti voglio per ogni parola l’elenco ordinato dei documenti che la contengono. 1. Ogni documento è affidato ad un map task (o
splittato tra più map task). 2. map: per ogni parola w nel documento docY
emette <w, docY>!3. reduce: prende tutte le coppie con chiave w, fa
il sort dei documentID ed emette una coppia <w,list(docID)>!
16
Cos’altro si può fare? Distributed sort: dato un elenco di record, ordinarli secondo un certo criterio. 1. Il documento è spezzettato in chunk 2. map: per ogni record estrae la chiave ed
emette delle coppie <key, value>!3. reduce: funzione identità
il sort funziona grazie ad un comportamento di default dei reduce, la garanzia di ordinamento, che vediamo più avanti...
17
Cos’altro si può fare? • Reverse Web-link Graph[*] • Term-Vector per Host[*] • Iterative MapReduce • ecc…
[*]“MapReduce: simplified Data Processing on Large Clusters”, Dean, Ghemawat. Google Inc. 6° Symposium on Operating Systems Design & Implementation (OSDI2004)
18
Nella pratica Esistono diversi tool per eseguire un programma MapReduce su architettura distribuita • Google MapReduce • Apache Hadoop • Apache Spark • Azure Twister
Permettono a programmatori senza alcuna esperienza di sistemi distribuiti di utilizzare facilmente le risorse di un data center per elaborare grandi moli di dati.
19
Apache Hadoop
• open source • struttura e interfaccia relativamente
semplici • fatto di due sotto-componenti:
MapReduce Runtime (coordinatore) Hadoop Distributed File System (HDFS)
• vediamo com’è fatto Hadoop v1.x.x
20
Namenode
HDFS file system distribuito che viene gestito da dei demoni con un’architettura master/slave
Datanode Datanode
macchina 1 (master)
macchina 2 (slave) macchina 3 (slave)
il contenuto del file system NON è (in generale) replicato sui vari nodi, ma spezzettato tra i vari nodi
21
copio un file dal file system locale in HDFS
Hadoop si occupa dello split in modo trasparente all’utente.
Namenode
Datanode Datanode
file.txt!
:~$ hadoop dfs !–copyFromLocal !/path/to/file.txt !/path/in/hdfs/file.txt
blk001! blk002!
macchina 1 (master)
macchina 2 (slave) macchina 3 (slave)
file.txt!
22
Verifico il contenuto di HDFS
• Il file mi viene mostrato come se fosse tutto intero su un unico supporto
• i comandi sono shell-like
hadoop@hadoop1:~$ hadoop dfs –ls!Found x items !... !drwxr-xr-x - hadoop root 2010-03-16 11:36 /user/hadoop/file.txt!... !
hadoop@hadoop1:~$ hadoop dfs –mkdir mydir!hadoop@hadoop1:~$ hadoop dfs –rmr mydir!!
23
HDFS Può gestire autonomamente anche la replicazione dei dati. Parametro di configurazione: dfs.replication=2 Utile in caso di failure o per velocizzare i calcoli
Namenode
Datanode Datanode
file.txt!
002!
macchina 1 (master)
macchina 2 (slave) macchina 3 (slave)
file.txt!
001!002! 001!
24
Lancio un job wordcount (Hadoop v1.x.x.)
:~$ hadoop jar wordcount.jar WordCount indir outdir!
comando di lancio
contiene i .class
nome della classe che contiene il main
cartelle di input e
output nel dfs
Cosa succede in Hadoop quando lancio questo comando? Viene chiamato il componente MapReduce runtime.
25
MapReduce runtime (v1.x.x.) Componente di Hadoop che si occupa di generare e coordinare i map/reduce task. Architettura master/slave
JobTracker
TaskTraker
TaskTraker
Il jobtraker assegna il lavoro da fare (map/reduce tasks) ai tasktraker cercando di garantire il più possibile la località dei dati
macchina 1 (master)
macchina 2 (slave) macchina 3 (slave)
26
MapReduce runtime: data locality JobTracker Namenode
TaskTraker Datanode
TaskTraker Datanode
macchina 1 (master)
macchina 2 (slave) macchina 3 (slave)
blk001! blk002!
map blk001 map blk002 reduce key1to5 reduce key6to9
• Il jobtraker chiede al demone namenode: dov’è blk001?
• Il namenode risponde: su slave2
• jobtracker chiede al tasktracker 2: posso assegnarti un map task?
• tasktracker 2: sì • jobtracker alloca
map di blk001 a slave2
27
Manca qualcosa…
value map key value key value key value key value key value key value key value key
…...
(key1, val)
(key8, val)
(key1, val)
(key8, val)
(key5, val)
(key1, val)
map
map
reduce
reduce
reduce
…... …...
value key value key value key value key value key value key value key value key
shuf
fle
28
Manca qualcosa…
value map key value key value key value key value key value key value key value key
…...
(key1, val)
(key8, val)
(key1, val)
(key8, val)
(key5, val)
(key1, val)
map
map
reduce
reduce
reduce
…... …...
value key value key value key value key value key value key value key value key Come scelgo
quanti mapper e quanti reducer servono?
Come scelgo quanti map e reduce task servono?
29
Manca qualcosa…
value map key value key value key value key value key value key value key value key
…...
(key1, val)
(key8, val)
(key1, val)
(key8, val)
(key5, val)
(key1, val)
map
map
reduce
reduce
reduce
…... …...
value key value key value key value key value key value key value key value key
shuf
fle
Come scelgo quali chiavi vanno a un reduce task e quali all’altro?
30
Quanti M e R task? Vorremmo il maggior grado di parallelismo possibile => la risposta dipende da: 1. quale grado di parallelismo ho nella mia
architettura? Quanti core? con hyperthreading (capacità di eseguire 2 thread contemporaneamente sullo stesso core)?
2. quanti dati devo processare?
31
Quanti M e R task? Hadoop permette di specificare il grado di parallelismo di ogni macchina in configurazione esprimendo il numero di slot. uno slot è un contenitore in cui può finire un map/reduce task in esecuzione su una macchina dual-core con hyperthreding specificherò: • n° map slots = 4 • n° reduce slot = 4
poiché la fase di map e la fase di reduce non si sovrappongono (quasi) mai
core1 core2
32
Quanti map task lancio? Supponiamo di voler fare wordcount su un file da 1GB e di aver a 4 slave dual-core senza hyperthreading 8core totali => 8 slot Soluzione di default di Hadoop: • ogni map processa dei chunk di massimo 64MB • il file viene spezzato in blocchi di 64MB • Hadoop lancerà 1GB/64MB = 16 map task tutti insieme? NO, perché ho solo 8 slot!
M M
M M M M M M M M
M M M M M M 33
Quanti map task lancio? è la soluzione migliore? forse no… 1GB/8core = 128MB Se avessi avuto dei chunk da 128MB, avrei fatto tutto in parallelo con soli 8 task e meno cambi di contesto. Hadoop permette di cambiare il valore del chunck di default in configurazione, ma • occorrono nozioni di computazione distribuita • effettivi miglioramenti si notano solo con molti TB ⇒ solitamente hadoop decide da solo il numero di map numero di map task = numero di chunck da 64MB in input
M M M M M M M M
34
Quanti reduce task lancio? Questo lo decide l’utente! job.setNumReduceTasks(n);!Gli sviluppatori di hadoop consigliano due valori ottenuti statisticamente: 0.95 * n° tot di reduce slot!se i core sono tutti uguali oppure 1,75 * n° tot di reduce slot!se c’è qualche differenza di velocità tra i core delle varie macchine
R R R R
R R R R
R R R
35
Manca qualcosa…
value map key value key value key value key value key value key value key value key
…...
(key1, val)
(key8, val)
(key1, val)
(key8, val)
(key5, val)
(key1, val)
map
map
reduce
reduce
reduce
…... …...
value key value key value key value key value key value key value key value key
shuf
fle
Come scelgo quali chiavi vanno a un reduce task e quali all’altro?
36
Partitioning/Shuffle phase
map
most poetry ignores most people
(most,1) (people,1) (ignore,1)
map
reduce
reduce
1 ignore 1 ignores
4most 2people 2poetry
most people ignore most poetry
(most,1) (poetry,1)
(most,1) (poetry,1) (ignores,1) (most,1) (people,1)
doc1
.txt
do
c2.t
xt
Come scelgo quali chiavi vanno a un reducer e quali all’altro? ma sopratutto, come faccio a farlo velocemente?
37
Naive partitioning La prima cosa che viene in mente è: • ordino la lista delle coppie intermedie per chiave:
[ignore,ignores,most,people,poetry] • divido la lista per il numero di reduce (bilancio il carico):
R=2 => R1=[ignore,ignores] R2=[most,people,poetry] Problemi: • l’ordinamento è sempre oneroso O(n log2 n) • posso decidere a che reducer mandare una coppia solo
dopo che è stato definito tutto l’elenco delle chiavi. Infatti se l’elenco fosse stato:
[ignore,ignores,most,people,pippo,poetry] R1=[ignore,ignores,most] R2=[people,pippo,poetry]
38
Hadoop default partitioning Il suggerimento di Dean e Ghemawat è di usare una hash function. Per cui Hadoop implementa questo partitioner di default: public class HashPartitioner<K, V> !! ! ! ! !extends Partitioner<K, V> {!
public int getPartition(K key, V value, !! ! ! ! ! !int numReduceTasks) {!
return (key.hashCode() & Integer.MAX_VALUE) ! ! ! ! ! !% numReduceTasks;!
}!}!
bitwise AND per avere solo valori positivi
modulo per avere sempre un risultato nell’intervallo [0,numReduceTasks-1]
39
Hadoop default partitioning Risultato: getPartition(“ignore”,”1”,2) = 0 getPartition(“ignores”,”1”,2) = 1 getPartition(“most”,”1”,2) = 1 getPartition(“pippo”,”1”,2) = 0 getPartition(“people”,”1”,2) = 1 getPartition(“poetry”,”1”,2) = 1
• nessun ordinamento • le coppie possono essere
inviate ai R man mano che vengono prodotte dai M, senza attendere che si sia definita tutta la lista.
40
Hadoop default partitioning
macchina 1
macchina 2 macchina 3
macchina 4
doc1.txt! doc2.txt!
doc3.txt!
R
coordinatore
R ignore,1!pippo,1!
ignores,1!most,4!people,2!poetry,2!
sui grandi numeri questo garantisce anche un certo bilanciamento del carico
NB: garanzia di ordinamento. Il reducer emette sempre i risultati ordinati per chiave
41
Cos’altro si può fare? Distributed sort: dato un elenco di record, ordinarli secondo un certo criterio. 1. Il documento è spezzettato in chunk 2. map: per ogni record estrae la chiave ed
emette delle coppie <key, value>!3. reduce: funzione identità
il sort funziona grazie alla garanzia di ordinamento. Tutte le coppie arrivano ad UN SOLO reduce task, che le trasmette immutate sull’output, ma in ordine.
42
Combiner function In certi casi può essere vantaggioso far fare qualcosa di più ai map task anticipando il lavoro dei reducer.
Es: wordcount classico ❒ map emette per ogni w nel chunk , <w,1> ❒ reduce emette <w,sum(values)> !
Così ho • un sacco di traffico tra M e R: una coppia <w,1>
per ogni w nel documento. • Il reducer deve fare tutte le somme
43
Combiner function Dean e Ghemawat osservano che: Se la funzione del reduce è associativa e commutativa, posso parzialmente anticiparla, facendola eseguire sulla stessa macchina del map task.
Es: wordcount ❒ map emette per ogni w nel chunk , <w,1> ❒ combiner (sulla stessa macchina del map): per ogni <w,1> fa la somma ed emette <w,S > = <w,sum(ones)> ❒ reduce emette <w,sum(S)>!
44
Fault tolerance MapReduce si presta a implementare meccanismi di tolleranza ai guasti, INDIPENDENTI dal problema (grep, sort, wordcount,etc.) • Se il nodo Master si accorge di errori su uno slave,
riesegue Map e Reduce di quello slave da un’altra parte. • Se il nodo Master si accorge di errori sui blocchi, li
salta e riesegue Può non esser sufficiente per gli scopi del programmatore, ma c’è in tutte le implementazioni MapReduce.
45
M
Elasticità Hadoop implementa anche meccanismi per scalare l’architettura a runtime.
JobTracker Namenode
TaskTraker Datanode
TaskTraker Datanode
macchina 1 (master)
macchina 2 (slave) macchina 3 (slave)
blk001! blk002! blk003!
M M
46
TaskTraker Datanode
M
Elasticità Hadoop implementa anche meccanismi per scalare l’architettura a runtime.
JobTracker Namenode
TaskTraker Datanode
TaskTraker Datanode
macchina 1 (master)
macchina 2 (slave) macchina 3 (slave)
blk001! blk002!
M M
macchina 4 (slave)
blk003!
47
Elasticità Particolarmente utile se la mia infrastruttura sottostante mi consente di aggiungere/togliere risorse facilmente…
=> il cloud è il supporto migliore consente elasticità nel provisioning e deprovisioning di nodi per la computazione (virtual machines)
48
In laboratorio ogni gruppo avrà a disposizione un cluster di 3 macchine virtuali configurate per eseguire dei job hadoop. • 1 master/slave
(jobtracker, namenode, tasktracker, datanode) • 2 slave
(tasktracker, datanode)
Vedremo • Creazione • Compilazione • Esecuzione di un programma MapReduce
49