commit a279ce84387793b42094780a8d6e39a75447d3f8 Author: امیرحسین مقیسه Date: Tue Feb 23 23:49:28 2021 +0330 initial diff --git a/.env.docker b/.env.docker new file mode 100644 index 0000000..a0430ab --- /dev/null +++ b/.env.docker @@ -0,0 +1,4 @@ +AMPQ_HOST="localhost" +AMPQ_PORT="5672" +AMPQ_USERNAME="Username" +AMPQ_PASSWORD='Password"' \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0f7b0f1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +# Created by https://www.toptal.com/developers/gitignore/api/go,vscode +# Edit at https://www.toptal.com/developers/gitignore?templates=go,vscode + +### Go ### +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +### Go Patch ### +/vendor/ +/Godeps/ + +### vscode ### +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +*.code-workspace +.env +# End of https://www.toptal.com/developers/gitignore/api/go,vscode \ No newline at end of file diff --git a/fail/fail.go b/fail/fail.go new file mode 100644 index 0000000..3e2e740 --- /dev/null +++ b/fail/fail.go @@ -0,0 +1,17 @@ +package fail + +import "log" + +//ShowError Fail +func ShowError(err error, msg string) { + if err != nil { + log.Printf("%s: %s", msg, err) + } +} + +//FailOnError Fail +func FailOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2aecd34 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module vnfco.ir/rabbit + +go 1.16 + +require ( + github.com/joho/godotenv v1.3.0 + github.com/streadway/amqp v1.0.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..43dc793 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= +github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/main.go b/main.go new file mode 100644 index 0000000..13b582a --- /dev/null +++ b/main.go @@ -0,0 +1,41 @@ +package main + +import ( + "encoding/json" + "log" + "os" + + "github.com/joho/godotenv" + "vnfco.ir/rabbit/fail" + "vnfco.ir/rabbit/rabbit" +) + +func init() { + err := godotenv.Load(".env") + fail.FailOnError(err, "Can't Load ENV file") +} + +func main() { + _, channel := rabbit.ConnectToAMPQServerAndCreateChannel(os.Getenv("AMPQ_HOST"), os.Getenv("AMPQ_PORT"), os.Getenv("AMPQ_USERNAME"), os.Getenv("AMPQ_PASSWORD")) + queueHello := rabbit.CreateOrJoinSimpleQueue(channel, "hi") + queueReply := rabbit.CreateOrJoinSimpleQueue(channel, "reply") + msgs := rabbit.Listen(channel, queueHello) + + forever := make(chan bool) + + go func() { + for d := range msgs { + log.Printf("Received a message: %s with Headers : %s", d.Body, d.Headers) + BodyMap := map[string]string{ + "title": "Hello baby I love you", + "body": "I love Your More Than You Ever Think ", + } + jsonBody, err := json.Marshal(BodyMap) + fail.FailOnError(err, "Can Not Stringify Map") + rabbit.Publish(channel, queueReply, string(jsonBody), "application/json") + } + }() + + log.Printf(" [*] Waiting for messages. To exit press CTRL+C") + <-forever +} diff --git a/rabbit/rabbit.go b/rabbit/rabbit.go new file mode 100644 index 0000000..13084f0 --- /dev/null +++ b/rabbit/rabbit.go @@ -0,0 +1,61 @@ +package rabbit + +import ( + "fmt" + + "github.com/streadway/amqp" + "vnfco.ir/rabbit/fail" +) + +//ConnectToAMPQServerAndCreateChannel to conncet to amq server and return connection +func ConnectToAMPQServerAndCreateChannel(host string, port string, username string, password string) (*amqp.Connection, *amqp.Channel) { + connection, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", username, password, host, port)) + fail.FailOnError(err, "Failed to connect to RabbitMQ") + channel, err := connection.Channel() + fail.FailOnError(err, "Failed to Create Channel to RabbitMQ") + return connection, channel +} + +// CreateOrJoinSimpleQueue A queue on rabbit mq +func CreateOrJoinSimpleQueue(Channel *amqp.Channel, QueueName string) *amqp.Queue { + queue, err := Channel.QueueDeclare( + QueueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + fail.FailOnError(err, "Failed to Create Queue Or Join") + return &queue +} + +//Listen on A queue in rabbit mq +func Listen(Channel *amqp.Channel, Queue *amqp.Queue) <-chan amqp.Delivery { + messages, err := Channel.Consume( + Queue.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + fail.ShowError(err, "Failed to Consume a Queue") + return messages +} + +//Publish : send message to the queue +func Publish(Channel *amqp.Channel, Queue *amqp.Queue, Body string, ContentType string) error { + err := Channel.Publish( + "", // exchange + Queue.Name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: ContentType, + Body: []byte(Body), + }) + fail.ShowError(err, "Failed to Send Message to Queue") + return err +}