使用Kafka和MongoDB进行Go异步处理

网友投稿 734 2023-05-07

使用Kafka和***进行Go异步处理

使用Kafka和***进行Go异步处理

在这个示例中,我将数据的保存和 *** 分离,并创建另一个微服务去处理它。我还添加了 Kafka 为消息层服务,这样微服务就可以异步处理它自己关心的东西了。

如果你有时间去看,我将这个博客文章的整个过程录制到 这个视频中了 :)

下面是这个使用了两个微服务的简单的异步处理示例的上层架构图。

rest-kafka-mongo-microservice-draw-io

微服务 2 —— 是一个订阅了 Kafka 中的一个主题的微服务,微服务 1 的数据保存在该主题。一旦消息被微服务消费之后,它接着保存数据到 *** 中。

在你继续之前,我们需要能够去运行这些微服务的几件东西:

下载 Kafka —— 我使用的版本是 kafka_2.11-1.1.0安装 librdkafka —— 不幸的是,这个库应该在目标系统中安装 Kafka Go 客户端运行 ***。你可以去看我的 以前的文章 中关于这一块的内容,那篇文章中我使用了一个 *** docker 镜像。

我们开始吧!

首先,启动 Kafka,在你运行 Kafka 服务器之前,你需要运行 Zookeeper。下面是示例:

$ cd //kafka_2.11-1.1.0$ bin/zookeeper-server-start.sh config/zookeeper.properties

接着运行 Kafka —— 我使用 9092 端口连接到 Kafka。如果你需要改变端口,只需要在 config/server.properties 中配置即可。如果你像我一样是个新手,我建议你现在还是使用默认端口。

$ bin/kafka-server-start.sh config/server.properties

version: '3'services: mongodb: image: mongo ports: - "27017:27017" volumes: - "mongodata:/data/db" networks: - network1 volumes: mongodata: networks: network1:

使用 Docker Compose 去运行 *** docker 容器。

这里是微服务 1 的相关代码。我只是修改了我前面的示例去保存到 Kafka 而不是 ***:

rest-to-kafka/rest-kafka-sample.go

这里是微服务 2 的代码。在这个代码中最重要的东西是从 Kafka 中消费数据,保存部分我已经在前面的博客文章中讨论过了。这里代码的重点部分是从 Kafka 中消费数据:

kafka-to-mongo/kafka-mongo-sample.go

func main() {  //Create *** session session := initialiseMongo() mongoStore.session = session  receiveFromKafka() } func receiveFromKafka() {  fmt.Println("Start receiving from Kafka") c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": "group-id-1", "auto.offset.reset": "earliest", })  if err != nil { panic(err) }  c.SubscribeTopics([]string{"jobs-topic1"}, nil)  for { msg, err := c.ReadMessage(-1)  if err == nil { fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value)) job := string(msg.Value) saveJobToMongo(job) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } }  c.Close() } func saveJobToMongo(jobString string) {  fmt.Println("Save to ***") col := mongoStore.session.DB(database).C(collection)  //Save data into Job struct var _job Job b := []byte(jobString) err := json.Unmarshal(b, &_job) if err != nil { panic(err) }  //Insert job into *** errMongo := col.Insert(_job) if errMongo != nil { panic(errMongo) }  fmt.Printf("Saved to *** : %s", jobString) }

我们来演示一下,运行微服务 1。确保 Kafka 已经运行了。

$ go run rest-kafka-sample.go

我使用 Postman 向微服务 1 发送数据。

Screenshot-2018-04-29-22.20.33

这里是日志,你可以在微服务 1 中看到。当你看到这些的时候,说明已经接收到了来自 Postman 发送的数据,并且已经保存到了 Kafka。

Screenshot-2018-04-29-22.22.00

因为我们尚未运行微服务 2,数据被微服务 1 只保存在了 Kafka。我们来消费它并通过运行的微服务 2 来将它保存到 ***。

$ go run kafka-mongo-sample.go

现在,你将在微服务 2 上看到消费的数据,并将它保存到了 ***。

Screenshot-2018-04-29-22.24.15

检查一下数据是否保存到了 ***。如果有数据,我们成功了!

Screenshot-2018-04-29-22.26.39

完整的源代码可以在这里找到:

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Redis详解以及Redis的应用场景
下一篇:如何优雅地使用Redis之位图操作
相关文章