RMQ Consumer (C#)

This code sets up a consumer to receive messages from a RabbitMQ queue and save the messages as JSON files. The consumer uses a ConnectionFactory to create a connection to the RabbitMQ server, and then creates a EventingBasicConsumer to receive messages from the queue. When a message is received, the consumer converts the message from a System.ReadOnlyMemory<byte> value to a byte[] value, and then uses the System.Text.Encoding.UTF8.GetString method to convert the byte[] value to a string.

The code then gets the current date and time in the UTC +0 time zone and converts it to a DateTime object. The DateTime object is then converted to a string in the desired format and used to generate the file path for the JSON file. Finally, the File.WriteAllText method is used to save the JSON string to the file.

The CreateConnectionFactory method is a helper method that creates a ConnectionFactory instance with the specified connection parameters, such as the host name, port, user name, password, and virtual host. The ConnectionFactory is then used to create the connection to the RabbitMQ server.

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;

namespace RmqConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            // Set up RabbitMQ connection
            var factory = CreateConnectionFactory("HostName", 5672, "email", "password", "VHost");

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                // Set up consumer to receive messages from the queue
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    // Convert the System.ReadOnlyMemory<byte> value to a byte[] value
                    var body = ea.Body.ToArray();

                    // Parse the JSON string into an object
                    var updateObject = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body));

                    // Serialize the object back to a string, with indentation and line breaks
                    var beautifiedUpdate = JsonConvert.SerializeObject(updateObject, Formatting.Indented);

                    // Get the current date and time in the UTC +0 time zone
                    DateTime utcTime = DateTime.UtcNow;

                    // Convert the tick value to a DateTime object
                    DateTime dateTime = DateTime.FromBinary(utcTime.Ticks);

                    // Convert the DateTime object to a string in the desired format
                    string dateTimeString = dateTime.ToString("yyyy-MM-dd'T'HH-mm-ss.fff'Z'");

                    // Generate the file path using the formatted date and time string
                    string filePath = Path.Combine("<FilesPath>", dateTimeString + ".json");

                    // Save the beautified JSON string to a file
                    File.WriteAllText(filePath, beautifiedUpdate);
                };


                channel.BasicConsume(queue: "_QUEUE_", autoAck: true, consumer: consumer);

                // Keep the program running
                Console.ReadLine();
            }
        }

        private static ConnectionFactory CreateConnectionFactory(string hostName, int port, string userName, string password, string virtualHost)
        {
            return new ConnectionFactory()
            {
                HostName = hostName,
                Port = port,
                UserName = userName,
                Password = password,
                AutomaticRecoveryEnabled = true,
                VirtualHost = virtualHost,
                RequestedHeartbeat = TimeSpan.FromSeconds(580),
                NetworkRecoveryInterval = TimeSpan.FromSeconds(1)
            };
        }
    }
}

Last updated