Welcome to Tqueues’s documentation!¶
TQueues¶
TQueues (reThinkdb Queues) is a simple python3.5+ library for queueing jobs and processing them in workers using rethinkdb.
Features¶
- Accepts coroutines as jobs
- Distributed
- Handles everything in a rethinkdb database object that can be reused at the worker
- Exposes changes via websockets
- Uses rethinkdb streaming “changes” method, wich is pretty efficient
- Uses asyncio, implements an async context manager for jobs and async iterator for the workers
+--> Worker
|
Rethindb --> Dispatcher ---> Worker
|
+--> Worker
Usage¶
TQueues provides two entry points, tqueues_dispatcher and tqueues_worker.
TQueues dispatcher must be accesible from all workers, and you need to start one worker per parallel task you need. They may be distributed (as long as they can reach the dispatcher)
TQueues worker¶
Usage:
tqueues_worker -h | --help
tqueues_worker --version
tqueues_worker --endpoint_url <endpoint_url>
tqueues_worker --queue <queue>
Options:
-h --help Show this screen
-v --version Show version
--endpoint_url <ENDPOINT_URL> TQueues dispatcher endpoint
--queue <QUEUE> Endpoint queue to listen on
Examples:
tqueues_worker --endpoint_url http://127.0.0.1:800/ --queue testqueue
TQueues job dispatcher¶
Usage:
tqueues_dispatcher --db "db" --host "127.0.0.1" --port 28015
tqueues_dispatcher --db "db" --host "127.0.0.1"
tqueues_dispatcher --db "db" --port 28015
tqueues_dispatcher --db "db" --user 'user'
tqueues_dispatcher --db "db" --password 'password'
tqueues_dispatcher -h | --help
tqueues_dispatcher --version
Options:
--host "127.0.0.1" Rethinkdb host
--db "db" Rethinkdb databaes
--port 28015 Rethinkdb port
--user 'user' Rethinkdb user
--password 'password' Rethinkdb password
--allowed_domains 'foo.com,bar.com' Allowed domains
--loglevel (DEBUG|INFO) Loglevel
-h --help Show this screen
--version Show version
Examples:
tqueues_dispatcher --host localhost --db foo --port 28015 --user foo --password bar --loglevel INFO --allowed_domains 'foo.com,bar.com'
TQueues Worker¶
TQueues worker
- Usage:
- tqueues_worker -h | –help tqueues_worker –version tqueues_worker –endpoint_url <endpoint_url> tqueues_worker –queue <queue>
- Options:
--endpoint_url <ENDPOINT_URL> TQueues dispatcher endpoint --host <host> Rethinkdb host --db <db> Rethinkdb databaes --port <port> Rethinkdb port --user <user> Rethinkdb user --password <password> Rethinkdb password --queue <QUEUE> Endpoint queue to listen on -h –help Show this screen -v –version Show version
- Examples:
- tqueues_worker –endpoint_url http://127.0.0.1:800/ –queue testqueue
-
class
tqueues.worker.
Job
(endpoint_url, rethinkdb, data)[source]¶ Bases:
object
Context manager that handles a job on the dispatcher.
This executes the given method (see Job.method) adding itself to it via “tq_parent_job” keyword argument. That means your worker methods need to accept a tq_parent_job.
That makes the job able to update information in the rethinkdb using the update method of tq_parent_job.
-
method
¶ Finds and imports the method specified in an importable format (foo.bar:baz would use baz from foo.bar)
-
TQueues Dispatcher¶
Tqueues job dispatcher.
- Usage:
- tqueues_dispatcher –db “db” –queue “queue” –host “127.0.0.1” tqueues_dispatcher –db “db” –queue “queue” –port 28015 tqueues_dispatcher –db “db” –queue “queue” –user ‘user’ tqueues_dispatcher –db “db” –queue “queue” –password ‘password’ tqueues_dispatcher –db “db” –queue “queue” –loglevel INFO tqueues_dispatcher –db “db” –queue “queue” tqueues_dispatcher -h | –help tqueues_dispatcher –version
- Options:
--host <host> Rethinkdb host --db <db> Rethinkdb databaes --port <port> Rethinkdb port --user <user> Rethinkdb user --password <password> Rethinkdb password –allowed_domains ‘foo.com,bar.com’ Allowed domains –loglevel (DEBUG|INFO) Loglevel -h –help Show this screen –version Show version
- Examples:
- tqueues_dispatcher –host localhost –db foo –port 28015 –user foo –password bar –loglevel INFO –allowed_domains ‘foo.com,bar.com’
-
class
tqueues.dispatcher.
Dispatcher
(request)[source]¶ Bases:
aiohttp.web_urldispatcher.View
Exposes an API for the workers that run as simple consumers When a worker finishes its task, consumes the next one.
Accepts
rethinkdb.connect
args. For a more secure environment, you should ONLY HAVE ONE DISPATCHER for database in rethinkdb.-
delete
()[source]¶ -
DELETE
/?queue={string:queue}&id={string:id}
¶
Marks a task as completed
Example request:
DELETE /?id=foo&queue=bar Host: example.com Accept: application/json, text/javascript
Example response:
HTTP/1.1 200 OK Vary: Accept Content-Type: text/javascript ok
Query id: id to mark as completed Query queue: queue (table) to work on Statuscode 200: This method always should return 200 -
-
get
()[source]¶ -
GET
/?queue={string:queue}
¶ Gets a task from the dispatcher.
If we run into a race condition, it’ll return a 404 for the client to retry
Example request:
GET /?queue=foo Host: example.com Accept: application/json, text/javascript
Example response:
HTTP/1.1 200 OK Vary: Accept Content-Type: text/javascript { 'queue': 'foo', method': 'method.to.execute', 'args': (*args), 'kwargs': {**kwargs} }
Note
method.to.execute must be a method importable in the workers’ side
Query queue: queue (table) where to listen in the database Statuscode 200: no error Statuscode 404: Race condition happened, task is no longer present -
-
patch
()[source]¶ In case the job is unable to access directly rethinkdb, we can use this endpoint to update the data. See Job documentation on Job.update.
-
post
()[source]¶ Creates a new task, just dumps whatever we have to the database after a brief format check
- {
- ‘queue’: ‘foo’, method’: ‘method.to.execute’, ‘args’: (args), ‘kwargs’: {kwargs}
}
-
POST
/
¶ Gets a task from the dispatcher.
If the table does not exist, it returns a 501 for the client to handle it
Example request:
POST / Host: example.com Accept: application/json, text/javascript { 'queue': 'foo', method': 'method.to.execute', 'args': (*args), 'kwargs': {**kwargs} }
Example response:
HTTP/1.1 200 OK Vary: Accept Content-Type: text/javascript ok
Note
method.to.execute must be a method importable in the workers’ side
<json string queue: Queue (table) to add this task to <json array args: List of positional arguments to pass to method <json array kwargs: List of keyword arguments to pass to method <json string method: Method to import and execute Statuscode 200: No error Statuscode 501: Table does not exist Statuscode 400: Not all params have been specified Statuscode 404: No more tasks in the queue, retry later
-
put
()[source]¶ -
PUT
/?queue={string:queue}
¶ Creates a queue if it does not exist.
Example request:
GET /?queue=foo Host: example.com Accept: application/json, text/javascript
Example response:
HTTP/1.1 200 OK Vary: Accept Content-Type: text/javascript ok
Query queue: queue (table) to create Statuscode 200: This method always should return 200 -
-