TQueues Worker¶
TQueues worker
- Usage:
- tqueues_worker -h | –help tqueues_worker –version tqueues_worker –endpoint_url <endpoint_url> tqueues_worker –queue <queue>
- Options:
--endpoint_url <ENDPOINT_URL> TQueues dispatcher endpoint --host <host> Rethinkdb host --db <db> Rethinkdb databaes --port <port> Rethinkdb port --user <user> Rethinkdb user --password <password> Rethinkdb password --queue <QUEUE> Endpoint queue to listen on -h –help Show this screen -v –version Show version
- Examples:
- tqueues_worker –endpoint_url http://127.0.0.1:800/ –queue testqueue
-
class
tqueues.worker.
Job
(endpoint_url, rethinkdb, data)[source]¶ Bases:
object
Context manager that handles a job on the dispatcher.
This executes the given method (see Job.method) adding itself to it via “tq_parent_job” keyword argument. That means your worker methods need to accept a tq_parent_job.
That makes the job able to update information in the rethinkdb using the update method of tq_parent_job.
-
method
¶ Finds and imports the method specified in an importable format (foo.bar:baz would use baz from foo.bar)
-
TQueues Dispatcher¶
Tqueues job dispatcher.
- Usage:
- tqueues_dispatcher –db “db” –queue “queue” –host “127.0.0.1” tqueues_dispatcher –db “db” –queue “queue” –port 28015 tqueues_dispatcher –db “db” –queue “queue” –user ‘user’ tqueues_dispatcher –db “db” –queue “queue” –password ‘password’ tqueues_dispatcher –db “db” –queue “queue” –loglevel INFO tqueues_dispatcher –db “db” –queue “queue” tqueues_dispatcher -h | –help tqueues_dispatcher –version
- Options:
--host <host> Rethinkdb host --db <db> Rethinkdb databaes --port <port> Rethinkdb port --user <user> Rethinkdb user --password <password> Rethinkdb password –allowed_domains ‘foo.com,bar.com’ Allowed domains –loglevel (DEBUG|INFO) Loglevel -h –help Show this screen –version Show version
- Examples:
- tqueues_dispatcher –host localhost –db foo –port 28015 –user foo –password bar –loglevel INFO –allowed_domains ‘foo.com,bar.com’
-
class
tqueues.dispatcher.
Dispatcher
(request)[source]¶ Bases:
aiohttp.web_urldispatcher.View
Exposes an API for the workers that run as simple consumers When a worker finishes its task, consumes the next one.
Accepts
rethinkdb.connect
args. For a more secure environment, you should ONLY HAVE ONE DISPATCHER for database in rethinkdb.-
delete
()[source]¶ -
DELETE
/?queue={string:queue}&id={string:id}
¶
Marks a task as completed
Example request:
DELETE /?id=foo&queue=bar Host: example.com Accept: application/json, text/javascript
Example response:
HTTP/1.1 200 OK Vary: Accept Content-Type: text/javascript ok
Query id: id to mark as completed Query queue: queue (table) to work on Statuscode 200: This method always should return 200 -
-
get
()[source]¶ -
GET
/?queue={string:queue}
¶ Gets a task from the dispatcher.
If we run into a race condition, it’ll return a 404 for the client to retry
Example request:
GET /?queue=foo Host: example.com Accept: application/json, text/javascript
Example response:
HTTP/1.1 200 OK Vary: Accept Content-Type: text/javascript { 'queue': 'foo', method': 'method.to.execute', 'args': (*args), 'kwargs': {**kwargs} }
Note
method.to.execute must be a method importable in the workers’ side
Query queue: queue (table) where to listen in the database Statuscode 200: no error Statuscode 404: Race condition happened, task is no longer present -
-
patch
()[source]¶ In case the job is unable to access directly rethinkdb, we can use this endpoint to update the data. See Job documentation on Job.update.
-
post
()[source]¶ Creates a new task, just dumps whatever we have to the database after a brief format check
- {
- ‘queue’: ‘foo’, method’: ‘method.to.execute’, ‘args’: (args), ‘kwargs’: {kwargs}
}
-
POST
/
¶ Gets a task from the dispatcher.
If the table does not exist, it returns a 501 for the client to handle it
Example request:
POST / Host: example.com Accept: application/json, text/javascript { 'queue': 'foo', method': 'method.to.execute', 'args': (*args), 'kwargs': {**kwargs} }
Example response:
HTTP/1.1 200 OK Vary: Accept Content-Type: text/javascript ok
Note
method.to.execute must be a method importable in the workers’ side
<json string queue: Queue (table) to add this task to <json array args: List of positional arguments to pass to method <json array kwargs: List of keyword arguments to pass to method <json string method: Method to import and execute Statuscode 200: No error Statuscode 501: Table does not exist Statuscode 400: Not all params have been specified Statuscode 404: No more tasks in the queue, retry later
-
put
()[source]¶ -
PUT
/?queue={string:queue}
¶ Creates a queue if it does not exist.
Example request:
GET /?queue=foo Host: example.com Accept: application/json, text/javascript
Example response:
HTTP/1.1 200 OK Vary: Accept Content-Type: text/javascript ok
Query queue: queue (table) to create Statuscode 200: This method always should return 200 -
-