retask.queue

This module contains the primary Queue which can be used to create and manage queues.

class retask.queue.Queue(name: str, config: dict[str, Any] | None = None)

Returns the Queue object with the given name. If the user passes optional config dictionary with details for Redis server, it will connect to that instance. By default it connects to the localhost.

connect() bool

Creates the connection with the redis server. Return True if the connection works, else returns False. It does not take any arguments.

Returns:

Boolean value

Note

After creating the Queue object the user should call the connect method to create the connection.

>>> from retask import Queue
>>> q = Queue('test')
>>> q.connect()
True
dequeue() Task | None

Returns a Task object from the queue. Returns None if the queue is empty.

Returns:

Task object from the queue

If the queue is not connected then it will raise retask.ConnectionError

>>> from retask import Queue
>>> q = Queue('test')
>>> q.connect()
True
>>> t = q.dequeue()
>>> print(t.data)
{u'name': u'kushal'}
enqueue(task)

Enqueues the given Task object to the queue and returns a Job object.

Parameters:

task – :Task object

Returns:

Job object

If the queue is not connected then it will raise retask.ConnectionError.

>>> from retask import Queue
>>> q = Queue('test')
>>> q.connect()
True
>>> from retask.task import Task
>>> task = Task({'name':'kushal'})
>>> job = q.enqueue(task)
find(obj)

Returns the index of the given object in the queue, it might be string which will be searched inside each task.

Parameters:

obj – object we are looking

Returns:

-1 if the object is not found or else the location of the task

property length: int

Gives the length of the queue. Returns None if the queue is not connected.

If the queue is not connected then it will raise retask.ConnectionError.

names() list[bytes]

Returns a list of queues available, None if no such queues found. Remember this will only show queues with at least one item enqueued.

send(task: Task, result, expire: int = 60)

Sends the result back to the producer. This should be called if only you want to return the result in async manner.

Parameters:
  • task – :Task object

  • result – Result data to be send back. Should be in JSON serializable.

  • expire – Time in seconds after the key expires. Default is 60 seconds.

wait(wait_time=0) Task | bool

Returns a Task object from the queue. Returns False if it timeouts.

Parameters:

wait_time – Time in seconds to wait, default is infinite.

Returns:

Task object from the queue or False if it timeouts.

>>> from retask import Queue
>>> q = Queue('test')
>>> q.connect()
True
>>> task = q.wait()
>>> print(task.data)
{u'name': u'kushal'}

Note

This is a blocking call, you can specity wait_time argument for timeout.

class retask.queue.Job(rdb)

Job object containing the result from the workers.

Parameters:

rdb – The underlying redis connection.

property result

Returns the result from the worker for this job. This is used to pass result in async way.

wait(wait_time=0)

Blocking call to check if the worker returns the result. One can use job.result after this call returns True.

Parameters:

wait_time – Time in seconds to wait, default is infinite.

Returns:

True or False.

Note

This is a blocking call, you can specity wait_time argument for timeout.