TQueues Worker

TQueues worker

Usage:
tqueues_worker -h | –help tqueues_worker –version tqueues_worker –endpoint_url <endpoint_url> tqueues_worker –queue <queue>
Options:
--endpoint_url <ENDPOINT_URL>
 TQueues dispatcher endpoint
--host <host> Rethinkdb host
--db <db> Rethinkdb databaes
--port <port> Rethinkdb port
--user <user> Rethinkdb user
--password <password> Rethinkdb password
--queue <QUEUE> Endpoint queue to listen on

-h –help Show this screen -v –version Show version

Examples:
tqueues_worker –endpoint_url http://127.0.0.1:800/ –queue testqueue
class tqueues.worker.Job(endpoint_url, rethinkdb, data)[source]

Bases: object

Context manager that handles a job on the dispatcher.

This executes the given method (see Job.method) adding itself to it via “tq_parent_job” keyword argument. That means your worker methods need to accept a tq_parent_job.

That makes the job able to update information in the rethinkdb using the update method of tq_parent_job.

method

Finds and imports the method specified in an importable format (foo.bar:baz would use baz from foo.bar)

update(data)[source]

Update remote object in the database providing data If rethinkdb connection parameters are specified, they’re used instead of the patch in the endpoint.

Warning

If you specify different rethinkdb parameters here than the used on the endpoint, it’ll lead to unexpected behavior

work()[source]

Run the job

Warning

Job MUST accept a named parameter job

containing this object.

class tqueues.worker.Worker(endpoint_url, rethinkdb, queue)[source]

Bases: object

Implements a simple worker over the http api

run_forever()[source]

We iterate over the job dispatcher object assigned to us.

tqueues.worker.client()[source]

Client

tqueues.worker.test(*args, **kwargs)[source]

Test function for worker

TQueues Dispatcher

Tqueues job dispatcher.

Usage:
tqueues_dispatcher –db “db” –queue “queue” –host “127.0.0.1” tqueues_dispatcher –db “db” –queue “queue” –port 28015 tqueues_dispatcher –db “db” –queue “queue” –user ‘user’ tqueues_dispatcher –db “db” –queue “queue” –password ‘password’ tqueues_dispatcher –db “db” –queue “queue” –loglevel INFO tqueues_dispatcher –db “db” –queue “queue” tqueues_dispatcher -h | –help tqueues_dispatcher –version
Options:
--host <host> Rethinkdb host
--db <db> Rethinkdb databaes
--port <port> Rethinkdb port
--user <user> Rethinkdb user
--password <password> Rethinkdb password

–allowed_domains ‘foo.com,bar.com’ Allowed domains –loglevel (DEBUG|INFO) Loglevel -h –help Show this screen –version Show version

Examples:
tqueues_dispatcher –host localhost –db foo –port 28015 –user foo –password bar –loglevel INFO –allowed_domains ‘foo.com,bar.com’
class tqueues.dispatcher.Dispatcher(request)[source]

Bases: aiohttp.web_urldispatcher.View

Exposes an API for the workers that run as simple consumers When a worker finishes its task, consumes the next one.

Accepts rethinkdb.connect args. For a more secure environment, you should ONLY HAVE ONE DISPATCHER for database in rethinkdb.

delete()[source]
DELETE /?queue={string:queue}&id={string:id}

Marks a task as completed

Example request:

DELETE /?id=foo&queue=bar
Host: example.com
Accept: application/json, text/javascript

Example response:

HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript

ok
Query id:id to mark as completed
Query queue:queue (table) to work on
Statuscode 200:This method always should return 200
get()[source]
GET /?queue={string:queue}

Gets a task from the dispatcher.

If we run into a race condition, it’ll return a 404 for the client to retry

Example request:

GET /?queue=foo
Host: example.com
Accept: application/json, text/javascript

Example response:

HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript
{
  'queue': 'foo', method': 'method.to.execute',
  'args': (*args), 'kwargs': {**kwargs}
}

Note

method.to.execute must be a method importable in the workers’ side

Query queue:queue (table) where to listen in the database
Statuscode 200:no error
Statuscode 404:Race condition happened, task is no longer present
patch()[source]

In case the job is unable to access directly rethinkdb, we can use this endpoint to update the data. See Job documentation on Job.update.

post()[source]

Creates a new task, just dumps whatever we have to the database after a brief format check

{
‘queue’: ‘foo’, method’: ‘method.to.execute’, ‘args’: (args), ‘kwargs’: {kwargs}

}

POST /

Gets a task from the dispatcher.

If the table does not exist, it returns a 501 for the client to handle it

Example request:

POST /
Host: example.com
Accept: application/json, text/javascript

{
  'queue': 'foo', method': 'method.to.execute',
  'args': (*args), 'kwargs': {**kwargs}
}

Example response:

HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript

ok

Note

method.to.execute must be a method importable in the workers’ side

<json string queue:
 Queue (table) to add this task to
<json array args:
 List of positional arguments to pass to method
<json array kwargs:
 List of keyword arguments to pass to method
<json string method:
 Method to import and execute
Statuscode 200:No error
Statuscode 501:Table does not exist
Statuscode 400:Not all params have been specified
Statuscode 404:No more tasks in the queue, retry later
put()[source]
PUT /?queue={string:queue}

Creates a queue if it does not exist.

Example request:

GET /?queue=foo
Host: example.com
Accept: application/json, text/javascript

Example response:

HTTP/1.1 200 OK
Vary: Accept
Content-Type: text/javascript

ok
Query queue:queue (table) to create
Statuscode 200:This method always should return 200
tqueues.dispatcher.server()[source]

Starts main dispatch server

tqueues.dispatcher.wshandle(request)[source]

Websocket handler Right now this only waits on given queue and dumps ‘new_val’ on rethinkdb changes.