Welcome 微信登录
编程资源 图片资源库 蚂蚁家优选 PDF转换器

首页 / 操作系统 / Linux / RabbitMQ 和 Kafka 简单的性能测试

测试环境:Ubuntu 15.10 64位cpu:inter core i7-4790 3.60GHZ * 8内存:16GB硬盘:ssd 120GB软件环境:rabbmitmq 3.6.0 kafka0.8.1  (均为单机本机运行) PS: 测试结果均为单操作测试,即生产的时候没有消费操作测试结果:kafka :消费速度: 37,586 /s  生产速度: 448,753 /srabbitmq: 消费速度: 20,807 /s  生产速度  16.413 /s出现问题:rabbitmq 生产4分钟左右出现队列阻塞,无法继续添加数据,1分钟后恢复,再过大约1分钟又出现此现象并以约1分钟为间隔出现此问题。rabbitmq 生产对象时有不小的几率(约 1/20)添加队列失败,报出的错误是“tcp链接重置”其他并无任何问题结论:很明显的看出kafka的性能远超rabbitmq。不过这也是理所当然的,毕竟2个消息队列实现的协议是不一样的,处理消息的场景也大有不同。rabbitmq适合处理一些数据严谨的消息,比如说支付消息,社交消息等不能丢失的数据。kafka是批量操作切不报证数据是否能完整的到达消费者端,所以适合一些大量的营销消息的场景。代码:kafka:package main
import (
    "github.com/Shopify/sarama"
    "os"
    "os/signal"
    "sync"
    "log"
    "time"

func main() {
    go producer()
//    go consumer()
    time.Sleep(10*time.Minute)
}func producer()  {
    config :=sarama.NewConfig()
    config.Producer.Return.Successes = true
    proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config)
    if err != nil {
        panic(err)
    }    signals :=make(chan  os.Signal,1)
    signal.Notify(signals,os.Interrupt)    var (
        wg                          sync.WaitGroup
        enqueued, successes, errors int
    )    wg.Add(1)
    go func() {
        defer  wg.Done()
        for _=range proder.Successes(){
            successes++
        }
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        for err := range proder.Errors(){
            log.Println(err)
            errors++
        }
    }()    go func() {
        t1 := time.NewTicker(time.Second)
        for{
            <- t1.C
            log.Println(enqueued)
        }
    }()    ProducerLoop:    for{
        message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")}
        select {
        case proder.Input() <- message:
            enqueued++        case <- signals:
            proder.AsyncClose()
            break ProducerLoop
        }    }    wg.Wait()
    log.Println("Successfully produced:%d;errors:%d ",successes,errors)}func consumer()  {
    coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil)
    if err != nil {
        panic(err)
    }    defer func() {
        if err :=coner.Close(); err !=nil{
            log.Fatalln(err)
        }
    }()    partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest)
    if err != nil {
        panic(err)
    }    defer func() {
        if err := partitionConsumer.Close();err!=nil{
            log.Fatalln(err)
        }
    }() 
    signals := make(chan os.Signal,1)
    signal.Notify(signals,os.Interrupt)
    consumed:=0    go func() {
        t1 := time.NewTicker(time.Second)
        for{
            <- t1.C
            log.Println(consumed)
        }
    }()    ConsumerLoop:
    for{
        select {
        case _ = <-partitionConsumer.Messages():            consumed++
//            log.Println( string(msg.Value),"  =>  ",consumed)
        case <-signals:
            break ConsumerLoop
        }
    }    log.Printf("Consumed: %d ", consumed)
}rabbitmq:package mainimport (
    "github.com/streadway/amqp"
    "time"
    "fmt"
    "log"
)const (
    queueName = "push.msg.q"
    exchange  = "t.msg.ex"
    mqurl ="amqp://shimeng:shimeng1015@192.168.155.106:5672/push")var conn *amqp.Connection
var channel *amqp.Channelfunc main() {
    fmt.Println(1)
//    push()
    receive()
//    fmt.Println("end")
//    close()
}func failOnErr(err error, msg string) {
    if err != nil {
        log.Fatalf("%s:%s", msg, err)
        panic(fmt.Sprintf("%s:%s", msg, err))
    }
}func mqConnect() {
    var err error
    conn, err = amqp.Dial(mqurl)
    if err != nil {
        log.Println(1)
        log.Fatalln(err)
    }
    fmt.Println(5)
    channel, err = conn.Channel()
    if err != nil {
        fmt.Println(2)
        log.Fatalln(err)
    }else {
        fmt.Println("a")
    }
}func push() {
    count := 0
    if channel == nil {
        fmt.Println(2)
        mqConnect()
    }else {
        fmt.Println(3)
    }
    msgContent := "hello world!"
    t1 := time.NewTicker(time.Second)    go func() {
        for{
            <- t1.C
            log.Println(count)
        }
    }()    for{
        err := channel.Publish(exchange, "test", false, false, amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(msgContent),
        })
        if err != nil {        }else {
            count ++
        }    }}func receive() {
    if channel == nil {
        mqConnect()
    }
    count :=0
    msgs, err := channel.Consume(queueName, "", true, false, false, false, nil)
    failOnErr(err, "")    forever := make(chan bool)    t1 := time.NewTicker(time.Second)
    go func() {
        for{
            <- t1.C
            log.Println(count)
        }
    }()
    go func() {
        //fmt.Println(*msgs)
        for _= range msgs {
            count ++
//            s := BytesToString(&(d.Body))
//            count++
//            fmt.Printf("receve msg is :%s -- %d ", *s, count)
        }
    }()    fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C ")
    <-forever
}CentOS 5.6 安装RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htmRabbitMQ客户端C++安装详细记录 http://www.linuxidc.com/Linux/2012-02/53521.htm用Python尝试RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htmRabbitMQ集群环境生产实例部署 http://www.linuxidc.com/Linux/2012-10/72720.htmUbuntu下PHP + RabbitMQ使用 http://www.linuxidc.com/Linux/2010-07/27309.htm在CentOS上安装RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htmRabbitMQ概念及环境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htmRabbitMQ入门教程  http://www.linuxidc.com/Linux/2015-02/113983.htmRabbitMQ 的详细介绍:请点这里
RabbitMQ 的下载地址:请点这里本文永久更新??接地址:http://www.linuxidc.com/Linux/2016-03/129093.htm