gensim word2vec中的分布式计算 速度没

为何要分布式计算?
需要构建一个百万文档级语料库的语义代表,却耗时太~~长?手上有几个可用的闲置计算机?分布式计算力争通过将给定的任务切分为几个小型任务,并将这些任务指派给几台平行的计算机完成来实现加速计算。
在这里提到的计算节点是指通过其IP地址/端口识别的计算机,并通过TCP/IP协议完成通讯。所有可用的计算机作为一个整体,称为集群(cluster)。分布式是非常粗粒度的(没有太多实时通讯),因此允许网络有相对较高的延迟。
警告:使用分布式的最重要的原因是加快速度。在gensim中,大多数时间消耗在其内部的底层线性代数计算上,主要是NumPy中,与任何gensim代码无关。为NumPy安装一个快速的基本线性代数(BLAS)库能将提高至15倍!因此在你开始购买额外的计算机前,先考虑安装一个快速的、多线程的BLAS来使你的机器发挥其更大的能力(而不是一个通用的、二进制分布式库)。可选的BLAS库包括你的供应商的BLAS库(Inter MKL,AMD ACML,OS X vecLib,Sun
Sunperf)或者一些开源的 (GotoBLAS, ALTAS)。
想查看你正在使用哪个BLAS和LAPACK,你可以在命令行输入:
python -c ' scipy.show_config()'
Gensim使用Python远程对象(Remote Object,Pyro)实现节点间的通讯,版本号不低于4.27。这是一个底层套接字通讯和远程程序调用(RPC)库。Pyro是一个纯Python库,因此其安装十分简单,仅需将其*.py文件复制到你的Python的import路径。
sudo easy_install Pyro4
你不需要为了运行gensim安装Pyro,但是如果你不安装,就不能使用其分布式计算(即所有的过程都将是连续模式运行,本页面的例子无法使用)。
就像往常一样,gensim努力做到干净、简单的API(参见介绍部分的“属性”)。你不需要为了能在计算机集群上运行而对你的代码进行任何修改!
你需要做的是开始计算前在每一个集群节点上运行一个工作者脚本(见下方)。运行该脚本告知gensim它可以使用这些节点工作。在初始化时,gensim内部算法将会尝试寻找和使用所有可用的工作者节点。
一个逻辑工作单元。可以是一台物理机器,也可以在一台机器上运行多个工作者脚本得到多个逻辑节点。
几个可以通过TCP/IP通讯的节点。现在,网络广播被用来发现和连接所有通讯节点,因此节点必须在同一个广播域。
在每个节点上创建的进程。从集群中移除节点,仅需结束他的工作者进程。
调度器将会负责协调所有计算任务、队列、分发(“分派”)各个工作者的工作。计算指令从不与工作者节点直接交流,仅仅通过调度器。一个集群中同一时间仅能有一个活动的调度器,不像工作者可以有多个。
其他分布式算法
分布式潜在语义分析
分布式隐含狄利克雷分配
注明:本文章属于转载,仅供行业人员学习交流使用,文章版权属于原创作者,在此向原创者致敬,感谢原创作者为大家学习交流提供精品内容。
站方声明:IThao123是为广大互联网从业者免费提供学习交流的平台,如果侵犯了原创著作权,请联系站方删除,给你带来不便,深表歉意。2458人阅读
Python(10)
仅供个人学习之用,如有错误,敬请指正。
如果想要开启日志,别忘记设置:
&&& import logging
&&& logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', )
相似度接口
在之前的教程和中,我们了解了创建在向量空间创建一个语料库意味着什么,如何在不同的向量空间之间转换。我们所做的一切都是为了一个共同目标:决定文档对之间的相似度或者一篇特定文档和其他文档之间的相似度(例如用户输入与已索引文档)。
为了展示如何在gensim中做这项工作,让我们使用与之前相同的语料库(这个语料库真的来自于1990年的Deerwester等的):
&&& from gensim import corpora, models, similarities
&&& dictionary = corpora.Dictionary.load('/tmp/deerwester.dict')
&&& corpus = corpora.MmCorpus('/tmp/deerwester.mm')
&&& print(corpus)
MmCorpus(9 documents, 12 features, 28 non-zero entries)
为了模仿Deerwester的例子,我们首先用这个微型语料库来定义一个2维LSI空间:
lsi = models.LsiModel(corpus, id2word=dictionary, num_topics=2)
现在假设一个用户键入查询“Human computer interaction”,我们应该对我们的九个语料库文档按照与该输入的关联性逆向排序。与现代搜索引擎不同,我们仅集中关注一个单一方面的可能的相似性——文本(单词)的语义关联性。没有超链接,没有随机行走静态排列,只是在关键词布尔匹配的基础上进行了语义扩展。
&&& doc = "Human computer interaction"
&&& vec_bow = dictionary.doc2bow(doc.lower().split())
&&& vec_lsi = lsi[vec_bow]
&&& print(vec_lsi)
[(0, -0.461821), (1, 0.070028)]
此外,我们将会考虑来决定两个向量的相似性。余弦相似性是一种向量空间模型的标准方法,但是对于表示概率分布的向量,其他相似度方法可能更好。
初始化查询结构
为了准备相似度查询,我们需要输入所有我们我们需要比较的文档。在本例中,他们是LSI训练中用到的被转换到2-D的LSA空间的9个文档。但是这只是偶然,我们也可能索引完全不同的语料库。
&&& index = similarities.MatrixSimilarity(lsi[corpus])
警告:similarities.MatrixSimilarity类仅仅适合能将所有的向量都在内存中的情况。例如,如果一个百万文档级的语料库使用该类,可能需要2G内存与256维LSI空间。
如果没有足够的内存,你可以使用similarities.Similarity类。该类的操作只需要固定大小的内存,因为他将索引切分为多个文件(称为碎片)存储到硬盘上了。它实际上使用了similarities.MatrixSimilarity和similarities.SparseMatrixSimilarity两个类,因此它也是比较快的,虽然看起来更加复杂了。
索引也可以通过标准的save()和load()函数来存储到硬盘上。
&&& index.save('/tmp/deerwester.index')
&&& index = similarities.MatrixSimilarity.load('/tmp/deerwester.index')
所有的索引类都可以用这种方法(similarities.Similarity,similarities.MatrixSimilarity和similarities.SparseMatrixSimilarity)。下面也是,索引可以是任何一个索引类。如果有疑问,可以使用similarities.Similarity,因为它是最具扩展性的版本,它也支持之后增加更多的文档索引。
为了获得我们的查询文档相对于其他9个经过索引的文档的相似度:
&&& sims = index[vec_lsi]
&&& print(list(enumerate(sims)))
[(0, 0.), (1, 0.), (2, 0.), (3, 0.9865886), (4, 0.),
(5, -0.), (6, -0.1063926), (7, -0.), (8, 0.)]
余弦方法返回的相似度在-1~1之间(越大越相似),所以第一个文档分数为0.等。
使用一些标准Python函数,我们将这些相似性倒序排列,并且获得了查询“Human computer interaction”的结果。
&&& sims = sorted(enumerate(sims), key=lambda item: -item[1])
&&& print(sims)
(3, 0.9865886),
(6, -0.1063926),
(出处结果的注释是我加的,为了方便观察。)
需要注意,使用标准的布尔全文搜索将不会返回编号分别为2和4的文档(The EPS user interface management system”,”Relation of user perceived response time to error measurement”),因为他们与“Human computer interaction”没有任何相同的单词。可是,在使用了LSI后,我们可以看到他们都得到了比较高的分数(2号文档是最相似的),直观感觉上他们都更加符合我们查询的“computer-human”(“人机”)相关主题。事实上,将语义概括化是我们首先使用转换和主题模型的原因。
接下来做什么
恭喜,你已经完成了教程-现在你知道gensim如何工作了。^_^为了钻研详细内容,你可以通篇浏览、阅读或者可能试一试gensim的。
Gensim是一个比较成熟的工具包,很多公司、个人已经成功将该工具应用于快速原型和生产。但是也并不意味着他是完美的。
还有一些部分可以用更加高效地方式实现(例如用c实现),或者更好地利用并行(多台机器内核)。
随时都有可能发布新的算法;你可以通过与帮助gensim跟上时代的步伐。
欢迎并且感激你的反馈(不仅仅是代码):、或者仅仅是贡献你的使用经理及问题。
Gensim并没有野心成为一个能包含自然语言处理(NLP)(或者仅仅是机器学习)领域的一切的框架。他的任务只是帮助NLP实践者能轻松地尝试流行的主题模型算法应用于大型数据集,并且帮助研究人员设计新算法原型。
&&相关文章推荐
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:27852次
排名:千里之外
评论:15条
(1)(1)(1)(1)(9)(1)(1)(1)Based on my previous question
I think that I should be able to parse basically any input by sc.textFile() and then using my or from some library custom functions.
Now I am particularly trying to parse the wikipedia dump using gensim framework. I have already installed gensim on my master node and all my worker nodes and now I would like to use gensim build in function for parsing wikipedia pages inspired by this question .
My code is following:
import sys
import gensim
from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) != 2:
print && sys.stderr, "Usage: wordcount &file&"
sc = SparkContext(appName="Process wiki - distributed RDD")
distData = sc.textFile(sys.argv[1])
#take 10 only to see how the output would look like
processed_data = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
print processed_data
The source code of extract_pages can be found at
and based on my going through it seems that it should work with Spark.
But unfortunately when I run the code I'm getting following error log:
14/10/05 13:21:11 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, &ip address&.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/root/spark/python/pyspark/worker.py", line 79, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/root/spark/python/pyspark/serializers.py", line 127, in dump_stream
for obj in iterator:
File "/root/spark/python/pyspark/serializers.py", line 185, in _batched
for item in iterator:
File "/root/spark/python/pyspark/rdd.py", line 1148, in takeUpToNumLeft
yield next(iterator)
File "/usr/lib64/python2.6/site-packages/gensim/corpora/wikicorpus.py", line 190, in extract_pages
elems = (elem for _, elem in iterparse(f, events=("end",)))
File "&string&", line 52, in __init__
IOError: [Errno 2] No such file or directory: u'&mediawiki xmlns="http://www.mediawiki.org/xml/export-0.9/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.9/ http://www.mediawiki.org/xml/export-0.9.xsd" version="0.9" xml:lang="en"&'
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
org.apache.spark.api.python.PythonRDD$$anon$1.&init&(PythonRDD.scala:154)
org.apache.spark.api.pute(PythonRDD.scala:87)
org.apache.spark.puteOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
And then some probably Spark log:
14/10/05 13:21:12 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 aborting job
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
14/10/05 13:21:12 INFO scheduler.DAGScheduler: Failed to run runJob at PythonRDD.scala:296
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I've tried this without Spark successfully, so the problem should be somewhere in combination of Spark and gensim, but I don't much understand the error that I'm getting. I don't see any file reading in the line 190 of gensim wikicorpus.py.
Added some more logs from Spark:
gensim uses from xml.etree.cElementTree import iterparse, documentation , which might cause the problem. It actually expects file name or file containing the xml data. Can be RDD considered as file containing the xml data?
解决方案 I usually work with Spark in Scala.
Nevertheless here are my thoughts:
When you load a file via sc.textFile, it is some sort of line iterator which is distributed across your sparkWorkers.
I think given the xml format of the wikipedia one line does not necessarily corresponds to a parsable xml item, and thus you are getting this problem.
&title& blabla &/title& &subitem&
Line 3 : &/subItem&
Line 4 : &/item&
If you try to parse each line on its own, it will spit out exceptions like the ones you got.
I usually have to mess around with a wikipedia dump, so first thing I do is to transform it into a "REadable version" which is easily digested by Spark. i.e: One line per article entry.
Once you have it like that you can easily feed it into spark, and do all kind of processing.
It doesn't take much resources to transform it
Take a look at ReadableWiki:
本文地址: &
根据我的previous问题<α href=\"/questions//spark-and-python-use-custom-file-format-generator-as-input-for-rdd?noredirect=1#comment77307\">Spark和Python使用自定义的文件格式/发电机作为RDD输入我想我应该可以通过sc.textFile()基本上解析任何输入,然后用我的还是从某些库自定义函数。现在我特别想用gensim框架来分析维基百科转储。我已经我的主节点和我所有的工作节点上安装gensim,现在我想用gensim建立功能解析这个问题List通过MAP(PySpark)返回元组(或迭代器)。我的code为以下内容: 进口SYS进口gensim从pyspark进口SparkContext如果__name__ ==“__main__”:
如果len(sys.argv中)= 2!
打印&GT;&GT; sys.stderr来的,“用法:单词计数&LT;文件&”
出口(-1)
SC = SparkContext(的appName =“过程维基 - 分布式RDD”)
distData = sc.textFile(sys.argv中[1])
#take 10只看见输出会是什么样子
processed_data = distData.flatMap(gensim.corpora.wikicorpus.extract_pages)。取(10)
打印processed_data
sc.stop()
extract_pages的来源$ C $ C可以在并根据我的经历看来,它应该与星火工作。 但不幸的是,当我运行code我收到以下错误日志:
14/10/05 13点21分11秒WARN scheduler.TaskSetManager:在第一阶段0.0迷失任务0.0(TID 0,&LT; IP地址& .ec2.internal):组织。 apache.spark.api.python.PythonException:回溯(最近通话最后一个):文件“/root/spark/python/pyspark/worker.py”线79,主serializer.dump_stream(FUNC(split_index,迭代器),OUTFILE)文件“/root/spark/python/pyspark/serializers.py”,196线,在dump_streamself.serializer.dump_stream(self._batched(迭代器),流)文件“/root/spark/python/pyspark/serializers.py”,第127行,在dump_stream在迭代器OBJ:文件“/root/spark/python/pyspark/serializers.py”,185线,在_batched在迭代器项目:文件“/root/spark/python/pyspark/rdd.py”,1148线,在takeUpToNumLeft接下来的产量(迭代器)文件“/usr/lib64/python2.6/site-packages/gensim/corpora/wikicorpus.py”线190,在extract_pageselems =(ELEM为_,在ELEM iterparse(F,事件=(“结束”,)))文件“&LT;串GT;”52行,在__init__IO错误:[错误2]没有这样的文件或目录:U'&LT;链接到MediaWiki的xmlns =“http://www.mediawiki.org/xml/export-0.9/”的xmlns:XSI =“HTTP://www.w3。组织/ 2001 / XML模式实例“XSI:的schemaLocation =”http://www.mediawiki.org/xml/export-0.9/ http://www.mediawiki.org/xml/export-0.9.xsd“版本=” 0.9“XML:LANG =”EN“GT&;'
org.apache.spark.api.python.PythonRDD $$匿名$ 1.read(PythonRDD.scala:124)
org.apache.spark.api.python.PythonRDD $$不久$ 1&LT;&初始化GT;(PythonRDD.scala:154)
org.apache.spark.api.pute(PythonRDD.scala:87)
org.apache.spark.puteOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745) 然后一些大概星火日志:
14/10/05 13时21分十二秒错误scheduler.TaskSetManager:任务0级0.0失败了4倍;中止工作14/10/05 13时21分十二秒INFO scheduler.TaskSchedulerImpl:删除taskset的0.0,其任务已全部建成后,从池14/10/05 13时21分十二秒INFO scheduler.TaskSchedulerImpl:取消0期14/10/05 13时21分十二秒INFO scheduler.DAGScheduler:无法在PythonRDD.scala运行runJob:296 和 在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.适用(DAGScheduler.scala:1174)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.适用(DAGScheduler.scala:1173)在scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)在scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)在org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala 1173)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.适用(DAGScheduler.scala:688)在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.适用(DAGScheduler.scala:688)在scala.Option.foreach(Option.scala:236)在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)在org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)在akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)在akka.actor.ActorCell.invoke(ActorCell.scala:456)在akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)在akka.dispatch.Mailbox.run(Mailbox.scala:219)在akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)在scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)在scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 我不星火成功尝试这个,所以问题应该在Spark和gensim的组合地方,但我不太明白,我得到的错误。我没有看到任何文件中gensim wikicorpus.py线190阅读。
编辑: 从星火加些木柴:
gensim从使用xml.etree.cElementTree进口iterparse ,文档的,这可能会导致问题。它实际上包含预计XML数据文件名或文件。可以RDD视为包含XML数据文件?解决方案 我通常在斯卡拉星火工作。不过这里有我的想法:当您通过加载一个sc.textFile文件,它是某种这是在你的sparkWorkers分布线迭代器。我觉得给维基百科一行并不一定对应于可解析XML项目的XML格式,因此你得到了这个问题。即:
1号线:其中;项目&GT; 2号线:其中,标题&GT;布拉布拉&LT; /标题&GT; &LT;&子项目GT; 3号线:LT; /子项目&GT; 4号线:LT; /项目&GT; 如果您尝试分析自身的每一行,它会吐出就像你的那些异常。我通常有更动维基百科转储,所以我做的第一件事就是把它改造成一个“可读的版本”,这很容易被消化星火。即:每篇文章录入一行。一旦你拥有了它一样,你可以很容易地反馈到火花,并做各种处理。它并不需要很多资源来改造它。看看ReadableWiki:
本文地址: &
扫一扫关注官方微信

我要回帖

更多关于 gensim doc2vec 的文章

 

随机推荐