不接触业务不知道更多技术,就无法拓展自己。在推荐领域kafka是必备的技术,那么放马过来吧。开始,
**For Recommendation in Deep learning QQ Second Group 102948747
For Visual in deep learning QQ Group 629530787
I'm here waiting for you **
不接受这个网页的私聊/私信!!
1-安装kafka
很多库并不是直接就是这个名字安装,比如opencv等等。
按照此博文更新pip后再安装
$ pip install kafka-python
经过测试发现如下bug,经查是我的分布式redis不支持keys(),把这行注释即可。
connection.py", line 756, in read_response
raise response
redis.exceptions.ResponseError: Protocol error: invalid multibulk length
.__conn.keys()
2-消费kafka
class KafkaConsumer(builtins.object)
| Consume records from a Kafka cluster.
| Keyword Arguments:
| bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
| strings) that the consumer should contact to bootstrap initial
| cluster metadata. This does not have to be the full node list.
| It just needs to have at least one broker that will respond to a
| Metadata API Request. Default port is 9092. If no servers are
| specified, will default to localhost:9092.
| client_id (str): A name for this client. This string is passed in
| each request to servers and can be used to identify specific
| server-side log entries that correspond to this client. Also
| submitted to GroupCoordinator for logging with respect to
| consumer group administration. Default: 'kafka-python-{version}'
| group_id (str or None): The name of the consumer group to join for dynamic
| partition assignment (if enabled), and to use for fetching and
| committing offsets. If None, auto-partition assignment (via
| group coordinator) and offset commits are disabled.
| Default: None
我这里只用第一个和第三个,其他的还有好多上面没有展示。
消费kafka队列,因为不需要获取实时数据,实时的可能会造成性能问题,只需消费kafka队列即可。如下:主动拉取数据,订阅主题topic
from kafka import KafkaConsumer
import time
kc = KafkaConsumer(group_id='100000', bootstrap_servers=['10.00.86.11:1001'])
kc.subscribe(topics=('test',))
n=10
>>> while n>0:
... msg=kc.poll(timeout_ms=120)
... print(type(msg))
... n-=1
上面的poll,从指定topic拿到数据
poll(timeout_ms=0, max_records=None, update_offsets=True) method of kafka.consumer.group.KafkaConsumer instance
Fetch data from assigned topics / partitions.
Records are fetched and returned in batches by topic-partition.
On each poll, consumer will try to use the last consumed offset as the
starting offset and fetch sequentially. The last consumed offset can be
manually set through :meth:`~kafka.KafkaConsumer.seek` or automatically
set as the last committed offset for the subscribed list of partitions.
Incompatible with iterator interface -- use one or the other, not both.
Arguments:
timeout_ms (int, optional): Milliseconds spent waiting in poll if
data is not available in the buffer. If 0, returns immediately
with any records that are available currently in the buffer,
else returns empty. Must not be negative. Default: 0
max_records (int, optional): The maximum number of records returned
in a single call to :meth:`~kafka.KafkaConsumer.poll`.
Default: Inherit value from max_poll_records.
Returns:
dict: Topic to list of records since the last fetch for the
subscribed list of topics and partitions.
这样拿到的是key:ConsumerRecord ,value:list,元素是
>>> type(value[0])
<class 'kafka.consumer.fetcher.ConsumerRecord'>
这种难以操作,还是用for遍历吧。
3-多进程存储数据
将数据放进队列中,然后采用多进程进行存储在redis。由上可知我的是分布式redis,但也可用redis读取,存储,删除,只是有些函数不能用,比如上面的keys,这种操作是有风险的。
既然是读取kafka的队列信息(或者称之为数据流),就不可能读取完后再挨个用管道(pipeline)存储,肯定是读取一个存储一个,这里就采用Queue存储数据,然后Process同步进行。
可参考我之前的博文。参考1,参考2,参考3.或者自己搜索也可。
愿我们终有重逢之时,而你还记得我们曾经讨论的话题。
版权归原作者 小李飞刀李寻欢 所有, 如有侵权,请联系我们删除。