Hadoop MapReduce

Büyük veri yazılımları içeren Apache Hadoop açık kaynak yazılım projesinin en önemli bileşenlerinden ilki MapReduce, diğeri ise HDFS (Hadoop Distributed File System) dağıtık dosya sistemi modülüdür. MapReduce, büyük boyutlu veriyi bilgisayar kümeleri üzerinde paralel olarak işlemek üzere uygulama yazabileceğimiz bir programlama modeli ve bu uygulamayı çalıştırabileceğimiz bir ortam sunar. MapReduce denildiğinde ilk olarak Hadoop akla gelse de, büyük veri işleme ve analiz araçlarının birçoğu MapReduce yaklaşımını izlemektedirler.

MapReduce modeli, fonksiyonel programlama dünyasından gelen Map (eşle) ve Reduce (indirge) isminde iki görev içerir. Map, bir veri kümesini alıp başka bir veri kümesine dönüştürür. Reduce ise, Map görevinin çıktısını alıp işleyerek, daha küçük bir sonuca indirger. MapReduce adının sıralamasından da anlaşılacağı üzere, Map görevi her zaman Reduce işleminden önce gerçekleştirilir. MapReduce modelinin en büyük avantajı, görevleri hesaplama düğümleri üzerine dağıtma, ölçeklendirme, görevleri çalıştırma, sonuçları alma gibi işlemlerin MapReduce ortamı tarafından otomatik olarak yapılmasıdır. Kullanıcıya düşen sorumluluk ise Map ve Reduce görevlerini kodlamaktır.

Map görevi, kendisine verilen <K1,V1> anahtar-değer ikilisini işleyerek <K2,V2> anahtar-değer ikililerinden oluşan bir liste, yani List(<K2,V2>) üretir. MapReduce uygulamasına girdi olarak HDFS dosyası veya klasörü verildiyse, buradaki dosya veya dosyalar bloklara bölünür ve her bir blok <blok kimliği, blok içeriği> ikilisi olarak ayrı bir Map görevine iletilir (blok kimliği sistem tarafından otomatik olarak üretilir).

Map görevlerinin çıktısı olan <K2,V2> anahtar-değer ikilileri, sistem tarafından anahtarlarına göre <K2,List(V2)> yapısında gruplanırlar. Bu sırada gruplar da aralarında anahtarlara göre sıralanmış olurlar. Bu işlemlerin gerçekleştiği aşamaya Karıştırma & Sıralama (Shuffling & Sorting) aşaması denir.

Oluşturulan <K2,List(V2)> gruplarından her biri bir Reduce görevine iletilir. Bu görev de kendisine verilen V2 değerlerinden oluşan listeyi işleyerek List(<K3,V3>) listesini oluşturur ve bu liste sonuçların tutulduğu HDFS klasöründe ayrı bir dosya içine yazılır.

Burada anlatılan girdi-çıktı yapıları aşağıdaki tablodaki gibi özetlenebilir:

Girdi

Görev

Çıktı

<K1,V1>

Map

List(<K2,V2>)

<K2,List(V2)>

Reduce

List(<K3,V3>)

MapReduce uygulamasına verilen dosya veya dosyaların bloklara bölünmesi ile elde edilen blok sayısı kadar Map görevi sistem tarafından oluşturulur ve görevlerin her biri ayrı bir bloku işlemek için çalıştırılır. Sistemde geçerli olan varsayılan blok büyüklüğü (blocksize) kullanıcı tarafından değiştirilebilir. Bu değişiklik, çalışacak Map görevi sayısını etkileyebilecektir. Reduce görevlerinin sayısı (mapreduce.job.reduces) ise kullanıcı tarafından belirlenebilir; varsayılan değeri 1’dir.

Bu noktada “kelime sayma” örneği ile ilerleyebiliriz. İçinde meyve isimleri bulunan aşağıdakine benzer bir dosyada hangi kelimenin kaç kez geçtiğini MapReduce ile bulmaya çalışalım:

elma armut elma erik

çilek elma armut armut

erik çilek erik erik

Bu amaçla, dosya içeriğini işlemek üzere önce Map görevini yazalım:

public void map(LongWritable key, Text value, Context con)
                   throws IOException, InterruptedException {
   String line = value.toString();
   String[] words=line.split(",");
   for(String word: words) {
       Text outputKey = new Text(word.toUpperCase().trim());
       IntWritable outputValue = new IntWritable(1);
       con.write(outputKey, outputValue);
   }
}

Map görevi kendisine verilen <blok kimliği, blok içeriği> ikilisini alıp, blok içeriğini kelimelerine ayıracak ve her bir kelime için <kelime, 1> ara çıktısını üretecektir. Örneğin, kullanılan blok boyutu gereğince girdi dosyamızın

elma armut elma erik

çilek elma armut armut

erik çilek erik erik

şeklinde üç bloka ayrılacağını varsayarsak, çalıştırılacak üç Map görevi tarafından

<elma, 1>

<armut, 1>

<elma, 1>

<erik, 1>

<çilek, 1>

<elma, 1>

<armut, 1>

<armut, 1>

<erik, 1>

<çilek, 1>

<erik, 1>

<erik, 1>

ara çıktıları üretilecektir.

Örneğimiz için Reduce görevi sayısını da iki olarak tanımladığımızı varsayarsak, bu ara çıktılar Karıştırma & Sıralama aşaması tarafından meyve ismi anahtar alanına göre aşağıdaki şekilde gruplanıp sıralanabilecektir:

<armut, List(1, 1, 1)>

<çilek, List(1, 1)>

<elma, List(1, 1, 1)>

<erik, List(1, 1, 1, 1)>

Bu grupları işlemek üzere Reduce görevini de şu şekilde yazabiliriz:

public void reduce(Text word, Iterable<IntWritable> values, Context con)
                          throws IOException, InterruptedException {
   int sum = 0;
   for(IntWritable value: values)
       sum += value.get();
   con.write(word, new IntWritable(sum));
}

Bu görev, kendisine verilen her bir grup için anahtara yani meyve ismine karşılık gelen 1 değerlerinin toplamını bulup aşağıdaki sonucu üretecektir:

<armut, 3>

<çilek, 2>

<elma, 3>

<erik, 4>

Map ve Reduce dışında, uygulama geliştirici dilerse Combiner ve Partitioner işlevlerini de kodlayabilir. Bunlardan ilki, her bir Map görevinin bitiminde, Map çıktısını toparlamak amacıyla ilgili Map ile aynı düğüm üzerinde çalıştırılır. Bu açıdan Combiner işlevine yerel toparlayıcı veya indirgeyici de denir. Bu örnekte, Reducer için kullandığımız kodun aynısını Combiner için de kullanabiliriz. Bu durumda, her bir Map görevinin ara çıktısı anında Combiner işlevine aktarılacak ve aşağıdaki sonuçlar elde edilecektir:

<elma, 2>

<armut, 1>

<erik, 1>

<çilek, 1>

<elma, 1>

<armut, 2>

<erik, 3>

<çilek, 1>

Buradan da anlaşılacağı üzere, 1 sayılarını toplama işinin bir kısmı hemen Map bitiminde yerel olarak gerçekleştirilecek; böylece sonrasında işlenecek ve taşınacak verinin hacmi bir miktar küçülmüş olacaktır.

Bu ara çıktıların Karıştırma & Sıralama aşamasında işlenmesi ile de, Reducer görevlerine iletilmek üzere şu sonuçlar elde edilmiş olacaktır:

<armut, List(1, 2)>

<çilek, List(1, 1)>

<elma, List(2, 1)>

<erik, List(1, 3)>

Bu grupların Reduce görevleri tarafından işlenmesinin ardından, yukarıdaki ile aynı sonuçlar elde edilmiş olacaktır:

<armut, 3>

<çilek, 2>

<elma, 3>

<erik, 4>

Partitioner ise, varsa Combiner işlevi, yoksa doğrudan Map görevi çıktısı olan <K2, V2> ikililerinin girdi olarak hangi Reducer görevine iletileceğini belirler. Varsayılan Partitioner olarak

hash(key) mod ToplamReduceGöreviSayısı

tanımına sahip “Hash Partitioner” kullanılmakta olup, çözüme katkısı olacak özel durumlar dışında yeni bir Partitioner tanımlamaya gerek yoktur.

MapReduce uygulamaları tipik olarak burada anlatıldığı şekilde HDFS üzerinde tutulan dosya içeriklerini işlerler. Bununla birlikte, bu uygulamaların (farklı dosya sistemleri, veri tabanları, veri toplama araçları gibi) başka ortamlardan veri alıp işleyecek şekilde kodlanmaları da mümkündür.