RMQ Consumer (GO)

In order to run GO program we need to do the following actions:

  1. Make sure to have the github.com/streadway/amqp package imported and available in your Go environment before running the code. You can achieve this by running the command go run main.go in your terminal, or you can add a go.mod file and include the package there. This will ensure that the necessary dependencies are properly managed and the code can be executed without any issues.

    module main
    
    go 1.19
    
    require github.com/streadway/amqp v1.1.0
  2. Make sure to adjust the following fields as described on documentation : * <UserName> = username * <Password> = password * <HostName> = OS - InPlay: inplay-rmq.lsports.eu / PreMatch: prematch-rmq.lsports.eu STM - InPlay: stm-inplay.lsports.eu / PreMatch: stm-prematch.lsports.eu * <VirtualHost> = OS = VirtualHost = "Customers" STM = VirtualHost = "StmPreMatch" / VirtualHost = "StmInPlay" * <PackageID> = should be a number between “_“ for example _100_

    package main
    
    import (
    	"encoding/json"
    	"fmt"
    	"log"
    
    	"github.com/streadway/amqp"
    )
    
    // UpdateObject represents the structure of your update object in Go.
    type UpdateObject map[string]interface{}
    
    // Main function to start the application.
    func main() {
    	// Set up RabbitMQ connection parameters
    	rmqURL := "amqp://<UserName>:<Password>@<HostName>:5672/<VirtualHost>"
    	queueName := "_<PackageID>_"
    
    	// Establish the connection to RabbitMQ server
    	conn, err := amqp.Dial(rmqURL)
    	if err != nil {
    		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    	}
    	defer conn.Close()
    
    	// Create a channel
    	channel, err := conn.Channel()
    	if err != nil {
    		log.Fatalf("Failed to open a channel: %v", err)
    	}
    	defer channel.Close()
    
      
    	// Set up consumer to receive messages from the queue
    	msgs, err := consumeMessages(channel, queueName)
    	if err != nil {
    		log.Fatalf("Failed to register a consumer: %v", err)
    	}
    
    	// Consume messages from the queue and process them
    	processMessages(msgs)
    }
    
    
    // Function to set up consumer and receive messages from the queue.
    func consumeMessages(channel *amqp.Channel, queueName string) (<-chan amqp.Delivery, error) {
    	msgs, err := channel.Consume(
    		queueName, // queue
    		"",        // consumer
    		true,      // auto-ack
    		false,     // exclusive
    		false,     // no-local
    		false,     // no-wait
    		nil,       // args
    	)
    	return msgs, err
    }
    
    // Function to process messages received from the queue and print them to the console.
    // Here you define how you will handle the data.
    func processMessages(msgs <-chan amqp.Delivery) {
    	for msg := range msgs {
    		// Convert the message body to a byte array
    		body := msg.Body
    
    		// Check if the message is empty
    		if len(body) == 0 {
    			log.Println("Received an empty message, skipping...")
    			continue
    		}
    
    		// Parse the JSON string into an object
    		var updateObject UpdateObject
    		err := json.Unmarshal(body, &updateObject)
    		if err != nil {
    			log.Printf("Failed to parse the message: %v", err)
    			continue
    		}
    
    		// Serialize the object back to a string, with indentation and line breaks
    		beautifiedUpdate, err := json.MarshalIndent(updateObject, "", "    ")
    		if err != nil {
    			log.Printf("Failed to beautify the message: %v", err)
    			continue
    		}
    
    		// Print the beautified JSON string to the console
    		fmt.Println(string(beautifiedUpdate))
    	}
    }
    

If you want to verify you can run the code and you will see messages acknowledged on the console.

Last updated