Test the RabbitMQ service with pika library in Github Actions

Hello there,

I am running into an issue: I created a RabbitMQ service and a Python script to communicate with it using the pika library. However, I can make the pika script to establish a connection with RabbitMQ. Has anyone experienced the same ? Any help in this direction is appreciated. I can also provide code snippets if that helps.

Thanks a million !
Roland

Hi @Roland-djee,
I found this previous ticket which related to RabbitMQ service . Please check the solution of it.
If this service setting could not work, please share your workflow yml file here. It would be great if you could share a sample repo for me to reproduce and investigate the issue.

Hi @yanjingzhu,

Thanks for your answer. As a matter of fact, I am aware of that solution and it works for me. The issue arises when trying to publish a message from a python script to a RabbitMQ service. Here are some snippets:

name: Run RabbitMQ messaging tests

on: push

jobs:
  container-test:
    runs-on: ubuntu-latest
    services:
      rabbitmq:
        image: rabbitmq:latest
        ports:
          - 5672/tcp
        # needed because the rabbitmq container does not provide a healthcheck
        options: --health-cmd "rabbitmqctl node_health_check" --health-interval 10s --health-timeout 5s --health-retries 5
    steps:
      - uses: actions/checkout@v2
      - uses: actions/setup-python@v2
        with:
          python-version: '3.8' # Version range or exact version of a Python version to use, using SemVer's version range syntax
       # - uses: getong/rabbitmq-action@v1.2
       #   with:
       #     rabbitmq version: '3.8.4-management'
       #     host port: 5672
       #     rabbitmq user: 'guest'
       #     rabbitmq password: 'guest'
       #     rabbitmq vhost: '/'
      - name: Install libs
        run: sudo  apt-get update && sudo apt-get install -y python3-pip && pip install pika pytest
      - name: Install cap and run simplequeue
        run: cd sandbox && pip install . && python simplequeue.py
        env:
          # get randomly assigned published ports
          RABBITMQ_PORT: ${{ job.services.rabbitmq.ports['5672'] }}
          AMQP_URL: amqp://localhost:${{ job.services.rabbitmq.ports[5672] }}

And the simplequeue.py:

import socket
import time
import os

# receiving message callback
def receiveMessage(channel, method, properties, body):
    print('[x] Received %r ' % body)


print('**** Running simplequeue.py ****')

isreachable = False
pingCounter = 0
print('**** first we see if we can reach the server')
while isreachable is False and pingCounter < 10:
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        # s.connect(('0.0.0.0',15672))
        s.connect(('0.0.0.0', 5672))
        isreachable = True
        print('[x] found the server')
    except socket.error as e:
        time.sleep(2)
        pingCounter += 1
    s.close()

if isreachable is False:
    print('[!] Could not get a socketfor <host>:5672')
else:
    print('[x] socket to host worked')


# rabbitmq_port = os.environ['RABBITMQ_PORT']
amqp_url = os.environ['AMQP_URL']
# print(rabbitmq_port)
print(amqp_url)

urlConnectionParameters = pika.URLParameters(amqp_url)
connection = pika.BlockingConnection(urlConnectionParameters)

Thanks for your help
Roland

Could the python script work in your local machine? Before you use GitHub Action, you need to make sure the python script is correct.
What’s the error message you got in the workflow run logs?
Did the service keep running when execute simplequeue.py ?
You could add some debug logging lines in your simplequeue.py to find the error location. Also, you could enable step debug logging of your workflow to check whether there are some detail information.

Yes, locally everything works fine. Here is a message I get from the workflow:

Run cd sandbox && pip install . && python simplequeue.py
  cd sandbox && pip install . && python simplequeue.py
  shell: /bin/bash -e {0}
  env:
    pythonLocation: /opt/hostedtoolcache/Python/3.8.3/x64
    RABBITMQ_PORT: 5672
    AMQP_URL: amqp://rabbitmq:5672
Processing /home/runner/work/cap/cap/sandbox
Using legacy setup.py install for cap, since package 'wheel' is not installed.
Installing collected packages: cap
    Running setup.py install for cap: started
    Running setup.py install for cap: finished with status 'done'
Successfully installed cap-0.0.0
**** Running simplequeue.py ****
amqp://rabbitmq:5672
[x] about to build a blocking connection
[!] We could not get a connection

This is using:

name: Run RabbitMQ messaging tests

on: push

jobs:
  container-test:
    runs-on: ubuntu-latest
    services:
      rabbitmq:
        image: rabbitmq:latest
        ports:
          - 5672:5672
        # needed because the rabbitmq container does not provide a healthcheck
        options: --health-cmd "rabbitmqctl node_health_check" --health-interval 10s --health-timeout 5s --health-retries 5
    steps:
      - uses: actions/checkout@v2
      - uses: actions/setup-python@v2
        with:
          python-version: '3.8' # Version range or exact version of a Python version to use, using SemVer's version range syntax
      - name: Install libs
        run: sudo  apt-get update && sudo apt-get install -y python3-pip && pip install pika pytest
      - name: Install cap and run simplequeue
        run: cd sandbox && pip install . && python simplequeue.py
        env:
          # get randomly assigned published ports
          RABBITMQ_PORT: ${{ job.services.rabbitmq.ports[5672] }}
          AMQP_URL: amqp://rabbitmq:${{ job.services.rabbitmq.ports[5672] }}

(Please note the change in the hostname and the port)
And:

import pika
import socket
import time
import os

# receiving message callback
def receiveMessage(channel, method, properties, body):
    print('[x] Received %r ' % body)


print('**** Running simplequeue.py ****')

amqp_url = os.environ['AMQP_URL']
print(amqp_url)

urlConnectionParameters = pika.URLParameters(amqp_url)

# # ### choose which parameter set
parameters = urlConnectionParameters
   
try:
    print('[x] about to build a blocking connection')
    connection = pika.BlockingConnection(parameters)
    print('[x] built a connection')
    channel = connection.channel()
    channel.queue_declare(queue='first_queue')

    channel.basic_publish(exchange='', routing_key='first_queue', body='Hi Queue')
    print('[x] send a first message to the queue')


    time.sleep(5)

    channel.basic_consume(queue='first_queue', auto_ack=True, on_message_callback=receiveMessage)

    print('[x] Waiting for messages')

    channel.start_consuming()
    connection.close()
except:
    print("[!] We could not get a connection")
    pass

As you can see, I get the error while trying to establish a connection.

How do I check if the service is still running ? I assumed they were persistent across runs. That’s why there are some healthchecks that are positive.

I haven’t tried step debugging logging maybe worth it ?

Thanks for the work though.

Roland

Sorry for the delay response. I have reproduced your issue and reported it to engineering team. I will keep you updated when they give me any advice.

Hi @yanjingzhu, thanks for this !

@Roland-djee worth trying using node js like: https://github.com/jpwilliams/remit/blob/9c03a9d21ee16053149e56dc9155028d411cc3df/lib/Remit.js#L59 and see if it works?
https://github.com/actions/example-services/issues/3#issuecomment-638135259 seems to say that it worked.
Not an expert in pika library, but trying out with that example atleast would tell us if it’s some configuration issue with pike library

@yaananth thanks for the tip. I can try it though. But locally everything works fine with pika :woman_shrugging:

What’s the result of the node.js method? Could it connect to RabbitMQ service?

@yanjingzhu Sorry, I’ve been dragged into other work. Will try this afternoon. Do you have any answer from the engineering team ?

@Roland-djee it works actually, I just tried on github actions.
you are connecting to wrong port (s.connect(('0.0.0.0', 5672))):

Try:

import socket
import time
import os
import pika

# receiving message callback
def receiveMessage(channel, method, properties, body):
    print('[x] Received %r ' % body)

rabbitmq_port = os.environ['RABBITMQ_PORT']
amqp_url = os.environ['AMQP_URL']
print(rabbitmq_port)
print(amqp_url)

print('**** Running simplequeue.py ****')

isreachable = False
pingCounter = 0
print('**** first we see if we can reach the server')
while isreachable is False and pingCounter < 10:
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        # s.connect(('0.0.0.0',15672))
        print(rabbitmq_port)
        print(amqp_url)
        s.connect(('localhost', int(rabbitmq_port)))
        isreachable = True
        print('[x] found the server')
    except socket.error as e:
        time.sleep(2)
        pingCounter += 1
    s.close()

if isreachable is False:
    print('[!] Could not get a socketfor <host>:5672')
else:
    print('[x] socket to host worked')



urlConnectionParameters = pika.URLParameters(amqp_url)
connection = pika.BlockingConnection(urlConnectionParameters)

result:

32768
amqp://localhost:32768
**** Running simplequeue.py ****
**** first we see if we can reach the server
32768
amqp://localhost:32768
[x] found the server
[x] socket to host worked

Hi @yaananth @yanjingzhu,

Actually @yaananth, your solution works for me, thanks for pointing that out. Curiously, I am almost certain that I tried it. Anyway, I made some progress in this. And it appears like when using:

import socket
import time
import os
import pika

# receiving message callback
def receiveMessage(channel, method, properties, body):
    print('[x] Received %r ' % body)

rabbitmq_port = os.environ['RABBITMQ_PORT']
amqp_url = os.environ['AMQP_URL']
print(rabbitmq_port)
print(amqp_url)

print('**** Running simplequeue.py ****')

isreachable = False
pingCounter = 0
print('**** first we see if we can reach the server')
while isreachable is False and pingCounter < 10:
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        # s.connect(('0.0.0.0',15672))
        print(rabbitmq_port)
        print(amqp_url)
        s.connect(('localhost', int(rabbitmq_port)))
        isreachable = True
        print('[x] found the server')
    except socket.error as e:
        time.sleep(2)
        pingCounter += 1
    s.close()

if isreachable is False:
    print('[!] Could not get a socketfor <host>:5672')
else:
    print('[x] socket to host worked')


    
urlConnectionParameters = pika.URLParameters(amqp_url)
connection = pika.BlockingConnection(urlConnectionParameters)

print('[x] about to build a blocking connection')
print('[x] built a connection')
channel = connection.channel()
channel.queue_declare(queue='first_queue')

channel.basic_publish(exchange='', routing_key='first_queue', body='Hi Queue')
print('[x] send a first message to the queue')


time.sleep(5)
    
channel.basic_consume(queue='first_queue', auto_ack=True, on_message_callback=receiveMessage)

print('[x] Waiting for messages')

channel.start_consuming()
connection.close()

It makes the workflow to hang indefinitely. I have nailed it down to the presence of the callback definition. If I remove (comment) the callback, the workflow proceeds until channel.basic_consume as per your solution.

So far, this is still a mystery.

Thanks for your help.