go-and-rabbitmq/rabbit/rabbit.go

62 lines
1.8 KiB
Go

package rabbit
import (
"fmt"
"github.com/streadway/amqp"
"vnfco.ir/rabbit/fail"
)
//ConnectToAMPQServerAndCreateChannel to conncet to amq server and return connection
func ConnectToAMPQServerAndCreateChannel(host string, port string, username string, password string) (*amqp.Connection, *amqp.Channel) {
connection, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", username, password, host, port))
fail.FailOnError(err, "Failed to connect to RabbitMQ")
channel, err := connection.Channel()
fail.FailOnError(err, "Failed to Create Channel to RabbitMQ")
return connection, channel
}
// CreateOrJoinSimpleQueue A queue on rabbit mq
func CreateOrJoinSimpleQueue(Channel *amqp.Channel, QueueName string) *amqp.Queue {
queue, err := Channel.QueueDeclare(
QueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
fail.FailOnError(err, "Failed to Create Queue Or Join")
return &queue
}
//Listen on A queue in rabbit mq
func Listen(Channel *amqp.Channel, Queue *amqp.Queue) <-chan amqp.Delivery {
messages, err := Channel.Consume(
Queue.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
fail.ShowError(err, "Failed to Consume a Queue")
return messages
}
//Publish : send message to the queue
func Publish(Channel *amqp.Channel, Queue *amqp.Queue, Body string, ContentType string) error {
err := Channel.Publish(
"", // exchange
Queue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: ContentType,
Body: []byte(Body),
})
fail.ShowError(err, "Failed to Send Message to Queue")
return err
}