# Code Samples

## Notes

* Replace credentials and `customerPackageId` placeholders with your assigned values.
* Queue name on both vhosts: `_{CustomerPackageId}_`

***

## Python

```python
import pika
import ssl
import json
import threading

HOST = "hyper-livescore.lsports.eu"
QNAME = f"_{customer_package_id}_"

def make_params(vhost: str, username: str, password: str) -> pika.ConnectionParameters:
    context = ssl.create_default_context()
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE
    context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
    return pika.ConnectionParameters(
        host=HOST,
        port=5671,
        virtual_host=vhost,
        credentials=pika.PlainCredentials(username, password),
        ssl_options=pika.SSLOptions(context, server_hostname=None),
        heartbeat=60,
        connection_attempts=5,
        retry_delay=10,
    )

fx_conn = pika.BlockingConnection(make_params("Fixtures", "fixtures-username", "fixtures-password"))
fx_ch = fx_conn.channel()
fx_ch.queue_declare(queue=QNAME, passive=True)

ic_conn = pika.BlockingConnection(make_params("Incidents", "incidents-username", "incidents-password"))
ic_ch = ic_conn.channel()
ic_ch.queue_declare(queue=QNAME, passive=True)

def on_fixture(ch, method, props, body):
    m = json.loads(body.decode("utf-8"))
    # upsert by m["body"]["fixtureId"] (delta-only)

def on_incident(ch, method, props, body):
    m = json.loads(body.decode("utf-8"))
    # correlate by m["body"]["fixtureId"] against your fixtures store

fx_ch.basic_consume(queue=QNAME, on_message_callback=on_fixture, auto_ack=True)
ic_ch.basic_consume(queue=QNAME, on_message_callback=on_incident, auto_ack=True)

threading.Thread(target=fx_ch.start_consuming, daemon=True).start()
print("Consuming from Fixtures and Incidents ...")
ic_ch.start_consuming()
```

***

## C\#

```csharp
using System;
using System.Text;
using System.Security.Authentication;
using System.Net.Security;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

string host = "hyper-livescore.lsports.eu";
string qname = $"_{customerPackageId}_";

ConnectionFactory FixturesFactory() => new ConnectionFactory
{
    HostName = host,
    Port = 5671,
    UserName = "fixtures-username",
    Password = "fixtures-password",
    VirtualHost = "Fixtures",
    Ssl = new SslOption
    {
        Enabled = true,
        Version = SslProtocols.Tls12,
        ServerName = host,
        AcceptablePolicyErrors =
            SslPolicyErrors.RemoteCertificateChainErrors |
            SslPolicyErrors.RemoteCertificateNameMismatch
    },
    RequestedHeartbeat = TimeSpan.FromSeconds(60),
    AutomaticRecoveryEnabled = true,
    NetworkRecoveryInterval = TimeSpan.FromSeconds(10),
    TopologyRecoveryEnabled = true
};

ConnectionFactory IncidentsFactory() => new ConnectionFactory
{
    HostName = host,
    Port = 5671,
    UserName = "incidents-username",
    Password = "incidents-password",
    VirtualHost = "Incidents",
    Ssl = new SslOption
    {
        Enabled = true,
        Version = SslProtocols.Tls12,
        ServerName = host,
        AcceptablePolicyErrors =
            SslPolicyErrors.RemoteCertificateChainErrors |
            SslPolicyErrors.RemoteCertificateNameMismatch
    },
    RequestedHeartbeat = TimeSpan.FromSeconds(60),
    AutomaticRecoveryEnabled = true,
    NetworkRecoveryInterval = TimeSpan.FromSeconds(10),
    TopologyRecoveryEnabled = true
};

void Consume(IModel ch, string queue, Action<string> handle)
{
    var consumer = new EventingBasicConsumer(ch);
    consumer.Received += (_, ea) => handle(Encoding.UTF8.GetString(ea.Body.ToArray()));
    ch.BasicConsume(queue: queue, autoAck: true, consumer: consumer);
}

using var fxConn = FixturesFactory().CreateConnection();
using var fxCh = fxConn.CreateModel();
fxCh.QueueDeclarePassive(qname);

using var icConn = IncidentsFactory().CreateConnection();
using var icCh = icConn.CreateModel();
icCh.QueueDeclarePassive(qname);

Consume(fxCh, qname, json => {
    // Upsert fixture by body.fixtureId (delta-only)
});

Consume(icCh, qname, json => {
    // Correlate by body.fixtureId against your fixtures store
});

Console.WriteLine("Consuming from Fixtures and Incidents ...");
Console.ReadLine();
```

***

## Java

```java
import com.rabbitmq.client.*;
import javax.net.ssl.*;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;

public class LiveScoreConsumers {

    private static SSLContext tls12TrustAll() throws Exception {
        TrustManager[] trustAll = new TrustManager[]{ new X509TrustManager() {
            public void checkClientTrusted(X509Certificate[] c, String a) {}
            public void checkServerTrusted(X509Certificate[] c, String a) {}
            public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; }
        }};
        SSLContext ctx = SSLContext.getInstance("TLSv1.2");
        ctx.init(null, trustAll, new SecureRandom());
        return ctx;
    }

    private static ConnectionFactory baseFactory(String host, String vhost, String user, String pass) throws Exception {
        ConnectionFactory f = new ConnectionFactory();
        f.setHost(host);
        f.setPort(5671);
        f.setUsername(user);
        f.setPassword(pass);
        f.setVirtualHost(vhost);
        f.useSslProtocol(tls12TrustAll());
        f.setRequestedHeartbeat(60);
        f.setAutomaticRecoveryEnabled(true);
        f.setNetworkRecoveryInterval(10_000);
        return f;
    }

    public static void main(String[] args) throws Exception {
        String host = "hyper-livescore.lsports.eu";
        String q = "_" + customerPackageId + "_";

        ConnectionFactory f1 = baseFactory(host, "Fixtures", "fixtures-username", "fixtures-password");
        Connection fxConn = f1.newConnection();
        Channel fxCh = fxConn.createChannel();
        fxCh.queueDeclarePassive(q);

        ConnectionFactory f2 = baseFactory(host, "Incidents", "incidents-username", "incidents-password");
        Connection icConn = f2.newConnection();
        Channel icCh = icConn.createChannel();
        icCh.queueDeclarePassive(q);

        DeliverCallback fxCb = (tag, d) -> {
            String json = new String(d.getBody(), StandardCharsets.UTF_8);
            // upsert by body.fixtureId (delta-only)
        };

        DeliverCallback icCb = (tag, d) -> {
            String json = new String(d.getBody(), StandardCharsets.UTF_8);
            // correlate by body.fixtureId against your fixtures store
        };

        fxCh.basicConsume(q, true, fxCb, tag -> {});
        icCh.basicConsume(q, true, icCb, tag -> {});

        System.out.println("Consuming from Fixtures and Incidents ...");
        Thread.sleep(Long.MAX_VALUE);
    }
}
```
