Abracadabra

Implement k-means on the hadoop platform

首先在单机上搭一个伪分布式环境,主要是对*-site.xml配置文件进行修改,具体修改如下:

core-site.xml

1
2
3
4
5
6
7
8
<?xml version="1.0"?>
<!-- core-site.xml -->
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost/</value>
</property>
</configuration>

hdfs-site.xml

1
2
3
4
5
6
7
8
<?xml version="1.0"?>
<!-- hdfs-site.xml -->
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
<?xml version="1.0"?>
<!-- yarn-site.xml -->
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>localhost</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>

mapred-site.xml

1
2
3
4
5
6
7
8
<?xml version="1.0"?>
<!-- mapred-site.xml -->
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

然后启动hadoop,启动的流程如下:

  1. start-dfs.sh
  2. start-yarn.sh
  3. mr-jobhistory-daemon.sh start historyserver

注意,以上命令能够得到正确执行的前提是已经将hadoop的安装目录下的bin目录加到环境变量中

由于很久没有使用Java,所以采用Python实现,这里需要用到一个package,也就是mrjob

首先计划一下实现步骤:

[ Mapper ]

Accepts

  • data
  • global constant representing the list of centers

Computes

  • the nearest center for each data instance

Emits

  • nearest centers (key) and points (value).

[ Reducer ]

Accepts

  • center instance / coordinate (key)
  • points (value)

Computes

  • the new centers based on clusters

Emits

  • new centers

You will provide the next epoch of K-Means with:

  1. the same data from your initial epoch
  2. the centers emitted from the reducer as global constants

Repeat until your stopping criteria are met.

如果要用Python进行相关的Hadoop操作的话,肯定是要使用hadoop streaming的,但是存在一个问题,也就是streaming流程只能跑一遍,但是很显然,作为一个machine learning算法,k-means是类似于EM算法要经过多步迭代的,那么最容易想到的就是使用shell脚本多次调用相关命令,但是这样显得十分ugly,因此可以采用mrjob包来帮助我们完成这个工作。

从上面看来,我们需要两个文件,一个是python实现的map-reduce,另一个是mrjobjob文件,相当于master,下面列出这两个文件,因为实现比较简单,因此不作过多解释.

kmeans.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from mrjob.job import MRJob
import mrjob
# MRJob is a python class which will be overloaded
from math import sqrt
class MRKMeans(MRJob):
SORT_VALUES = True
OUTPUT_PROTOCOL = mrjob.protocol.RawProtocol
def dist_vec(self, v1, v2):
# calculate the ditance between two vectors (in two dimensions)
return sqrt((v2[0] - v1[0]) * (v2[0] - v1[0]) + (v2[1] - v1[1]) * (v2[1] - v1[1]))
def configure_options(self):
super(MRKMeans, self).configure_options()
# the line below define that the file folowing the --c option is the
# centroid and is loadable
self.add_file_option('--c')
def get_centroids(self):
"""
Definition : extracts centroids from the centroids file define afetr --c flag
Out : Return the list of centroids
"""
# self.options.c is the name of the file following --c option
f = open(self.options.c, 'r')
centroids = []
for line in f.read().split('\n'):
if line:
x, y = line.split(', ')
centroids.append([float(x), float(y)])
f.close()
return centroids
def mapper(self, _, lines):
"""
Definition : Mapper take centroids extract form get_centroids()
and the point cloud and for each point, calculate the distance
to the centroids, find the mininum of it
Out : yield the point with it's class
"""
centroids = self.get_centroids()
for l in lines.split('\n'):
x, y = l.split(', ')
point = [float(x), float(y)]
min_dist = 100000000.0
classe = 0
# iterate over the centroids (Here we know that we are doing a 3means)
for i in range(3):
dist = self.dist_vec(point, centroids[i])
if dist < min_dist:
min_dist = dist
classe = i
yield classe, point
def combiner(self, k, v):
"""
Definition : Calculate for each class, at the end of the mapper,
before reducer, the medium point of each class
Out: return for each class, the centroids for each mapper
"""
count = 0
moy_x = moy_y = 0.0
for t in v:
count += 1
moy_x += t[0]
moy_y += t[1]
yield k, (moy_x / count, moy_y / count)
def reducer(self, k, v):
"""
Definition : for each class, get all the tmp centroids from each
combiner and calculate the new centroids.
"""
# k is class and v are medium points linked to the class
count = 0
moy_x = moy_y = 0.0
for t in v:
count += 1
moy_x += t[0]
moy_y += t[1]
print str(k) + ", " + str(moy_x / count) + ", " + str(moy_y / count)
if __name__ == '__main__':
# just run mapreduce !
MRKMeans.run()

main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from mrjob.job import MRJob
from kmeans import MRKMeans
import sys
import os.path
import shutil
from math import sqrt
import time
input_c = "centroids"
CENTROIDS_FILE = "/home/hduser/tmp/centroid"
def get_c(job, runner):
c = []
for line in runner.stream_output():
# print "stream_output: ", line
key, value = job.parse_output_line(line)
c.append(key)
return c
def get_first_c(fname):
f = open(fname, 'r')
centroids = []
for line in f.read().split('\n'):
if line:
x, y = line.split(', ')
centroids.append([float(x), float(y)])
f.close()
return centroids
def write_c(centroids):
f = open(CENTROIDS_FILE, "w")
centroids.sort()
for c in centroids:
k, cx, cy = c.split(', ')
# print c
f.write("%s, %s\n" % (cx, cy))
f.close()
def dist_vec(v1, v2):
return sqrt((v2[0] - v1[0]) * (v2[0] - v1[0]) + (v2[1] - v1[1]) * (v2[1] - v1[1]))
def diff(cs1, cs2):
max_dist = 0.0
for i in range(3):
dist = dist_vec(cs1[i], cs2[i])
if dist > max_dist:
max_dist = dist
return max_dist
if __name__ == '__main__':
args = sys.argv[1:]
if not os.path.isfile(CENTROIDS_FILE):
shutil.copy(input_c, CENTROIDS_FILE)
old_c = get_first_c(input_c)
i = 1
start = time.time()
while True:
print "Iteration #%i" % i
mr_job = MRKMeans(args=args + ['--c=' + CENTROIDS_FILE])
# print "start runner.."
with mr_job.make_runner() as runner:
runner.run()
centroids = get_c(mr_job, runner)
# print "mr result: ", centroids
write_c(centroids)
n_c = get_first_c(CENTROIDS_FILE)
# print "old_c", old_c
# print "n_c", n_c
max_d = diff(n_c, old_c)
# print "dist max = "+str(max_d)
if max_d < 0.01:
break
else:
old_c = n_c
i = i + 1
print "used time: ", time.time() - start, 's'

根据上面写的实现步骤可以看出,我们需要两个文件,一个存储输入数据,另一个存储centroids,由于只是一个demo,因此在这里我简化了具体问题。设所有的数据都是二维数据点,并且聚类个数为3。当然,如果真的是在大数据上进行工业级的处理的话,还是推荐使用Spark。下面列出这两个文件:

kmeans_data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1, 2
2, 3
1, 3.5
4, 3.5
3, 4.2
2, 1.6
5, 2.3
1.5, 2.3
3, 5.2
2, 3
1, 3.5
4, 3.5
3, 4.2
2, 1.6
5, 2.3
1.5, 2.3
3, 5

centroids

1
2
3
1, 2
2, 3
1, 3.5

按照以下方式运行:

1
python main.py kmeans_data -r hadoop

结果显示如下:

1
2
3
4
5
6
7
8
9
10
11
Iteration #1
No handlers could be found for logger "mrjob.hadoop"
old_c [[1.0, 2.0], [2.0, 3.0], [1.0, 3.5]]
n_c [[1.625, 1.95833333333], [3.4, 3.62], [1.0, 3.5]]
Iteration #2
old_c [[1.625, 1.95833333333], [3.4, 3.62], [1.0, 3.5]]
n_c [[1.72916666667, 2.2625], [3.75, 3.775], [1.0, 3.5]]
Iteration #3
old_c [[1.72916666667, 2.2625], [3.75, 3.775], [1.0, 3.5]]
n_c [[1.72916666667, 2.2625], [3.75, 3.775], [1.0, 3.5]]
time: 148.277868032

最后生成结果文件:

centroid

1
2
3
1.72916666667, 2.2625
3.75, 3.775
1.0, 3.5

根据以上可以看出,对于小数据集,效率反而会比较低,因为整个程序运行过程中大部分的时间都没有花在实际的算法运行上。

TODO:

  1. 用常规方法实现,作为baseline
  2. 在大数据集上继续实验,观察结果