Gear: Asynchronous Event-Driven Gearman Interface

This module implements an asynchronous event-driven interface to Gearman. It provides interfaces to build a client or worker, and access to the administrative protocol. The design approach is to keep it simple, with a relatively thin abstration of the Gearman protocol itself. It should be easy to use to build a client or worker that operates either synchronously or asynchronously.

The module also provides a simple Gearman server for use as a convenience in unit tests. The server is not designed for production use under load.

Client Example

To use the client interface, instantiate a Client, and submit a Job. For example:

import gear
client = gear.Client()
client.addServer('gearman.example.com')
client.waitForServer()  # Wait for at least one server to be connected

job = gear.Job("reverse", "test string")
client.submitJob(job)

The waitForServer() call is only necessary when running in a synchronous context. When running asynchronously, it is probably more desirable to omit that call and instead handle the NoConnectedServersError exception that submitJob may raise if no servers are connected at the time.

When Gearman returns data to the client, the Job object is updated immediately. Event handlers are called on the Client object so that subclasses have ample facilities for reacting to events synchronously.

Worker Example

To use the worker interface, create a Worker, register at least one function that the worker supports, and then wait for a Job to be dispatched to the worker.

An example of a Gearman worker:

import gear
worker = gear.Worker('reverser')
worker.addServer('gearman.example.com')
worker.registerFunction("reverse")

while True:
    job = worker.getJob()
    job.sendWorkComplete(job.arguments.reverse())

SSL Connections

For versions of Gearman supporting SSL connections, specify the files containing the SSL private key, public certificate, and CA certificate in the addServer() call. For example:

ssl_key = '/path/to/key.pem'
ssl_cert = '/path/to/cert.pem'
ssl_ca = '/path/to/ca.pem'
client.addServer('gearman.example.com', 4730, ssl_key, ssl_cert, ssl_ca)

All three files must be specified for SSL to be used.

API Reference

The following sections document the module’s public API. It is divided into sections focusing on implementing a client, a worker, using the administrative protocol, and then the classes that are common to all usages of the module.

Client Usage

The classes in this section should be all that are needed in order to implement a Gearman client.

Client Objects

class gear.Client[source]

A Gearman client.

You may wish to subclass this class in order to override the default event handlers to react to Gearman events. Be sure to call the superclass event handlers so that they may perform job-related housekeeping.

addServer(host, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None)

Add a server to the client’s connection pool.

Any number of Gearman servers may be added to a client. The client will connect to all of them and send jobs to them in a round-robin fashion. When servers are disconnected, the client will automatically remove them from the pool, continuously try to reconnect to them, and return them to the pool when reconnected. New servers may be added at any time.

This is a non-blocking call that will return regardless of whether the initial connection succeeded. If you need to ensure that a connection is ready before proceeding, see waitForServer().

When using SSL connections, all SSL files must be specified.

Parameters:
  • host (str) – The hostname or IP address of the server.
  • port (int) – The port on which the gearman server is listening.
  • ssl_key (str) – Path to the SSL private key.
  • ssl_cert (str) – Path to the SSL certificate.
  • ssl_ca (str) – Path to the CA certificate.
Raises ConfigurationError:
 

If the host/port combination has already been added to the client.

broadcast(packet)

Send a packet to all currently connected servers.

Parameters:packet (Packet) – The Packet to send.
getConnection()

Return a connected server.

Finds the next scheduled connected server in the round-robin rotation and returns it. It is not usually necessary to use this method external to the library, as more consumer-oriented methods such as submitJob already use it internally, but is available nonetheless if necessary.

Returns:The next scheduled Connection object.
Return type:Connection
Raises NoConnectedServersError:
 If there are not currently connected servers.
handleAdminRequest(request)

Handle an administrative command response from Gearman.

This method is called whenever a response to a previously issued administrative command is received from one of this client’s connections. It normally releases the wait lock on the initiating AdminRequest object.

Parameters:request (AdminRequest) – The AdminRequest that initiated the received response.
handleDisconnect(job)[source]

Handle a Gearman server disconnection.

If the Gearman server is disconnected, this will be called for any jobs currently associated with the server.

Parameters:packet (Job) – The Job that was running when the server disconnected.
handleEchoRes(packet)

Handle an ECHO_RES packet.

Causes the blocking Connection.echo() invocation to return.

Parameters:packet (Packet) – The Packet that was received.
Returns:None
handleError(packet)

Handle an ERROR packet.

Logs the error.

Parameters:packet (Packet) – The Packet that was received.
Returns:None
handleJobCreated(packet)[source]

Handle a JOB_CREATED packet.

Updates the appropriate Job with the newly returned job handle.

Parameters:packet (Packet) – The Packet that was received.
Returns:The Job object associated with the job request.
Return type:Job
handleOptionRes(packet)[source]

Handle an OPTION_RES packet.

Updates the set of options for the connection.

Parameters:packet (Packet) – The Packet that was received.
Returns:None.
handlePacket(packet)

Handle a received packet.

This method is called whenever a packet is received from any connection. It normally calls the handle method appropriate for the specific packet.

Parameters:packet (Packet) – The Packet that was received.
handleStatusRes(packet)[source]

Handle a STATUS_RES packet.

Updates the referenced Job with the returned data.

Parameters:packet (Packet) – The Packet that was received.
Returns:The Job object associated with the job request.
Return type:Job
handleWorkComplete(packet)[source]

Handle a WORK_COMPLETE packet.

Updates the referenced Job with the returned data and removes it from the list of jobs associated with the connection.

Parameters:packet (Packet) – The Packet that was received.
Returns:The Job object associated with the job request.
Return type:Job
handleWorkData(packet)[source]

Handle a WORK_DATA packet.

Updates the referenced Job with the returned data.

Parameters:packet (Packet) – The Packet that was received.
Returns:The Job object associated with the job request.
Return type:Job
handleWorkException(packet)[source]

Handle a WORK_Exception packet.

Updates the referenced Job with the returned data and removes it from the list of jobs associated with the connection.

Parameters:packet (Packet) – The Packet that was received.
Returns:The Job object associated with the job request.
Return type:Job
handleWorkFail(packet)[source]

Handle a WORK_FAIL packet.

Updates the referenced Job with the returned data and removes it from the list of jobs associated with the connection.

Parameters:packet (Packet) – The Packet that was received.
Returns:The Job object associated with the job request.
Return type:Job
handleWorkStatus(packet)[source]

Handle a WORK_STATUS packet.

Updates the referenced Job with the returned data.

Parameters:packet (Packet) – The Packet that was received.
Returns:The Job object associated with the job request.
Return type:Job
handleWorkWarning(packet)[source]

Handle a WORK_WARNING packet.

Updates the referenced Job with the returned data.

Parameters:packet (Packet) – The Packet that was received.
Returns:The Job object associated with the job request.
Return type:Job
sendPacket(packet, connection)

Send a packet to a single connection, removing it from the list of active connections if that fails.

Parameters:
  • packet (Packet) – The Packet to send.
  • connection (Connection) – The Connection on which to send the packet.
setOption(name, timeout=30)[source]

Set an option for all connections.

Parameters:
  • name (str) – The option name to set.
  • timeout (int) – How long to wait (in seconds) for a response from the server before giving up (default: 30 seconds).
Returns:

True if the option was set on all connections, otherwise False

Return type:

bool

shutdown()

Close all connections and stop all running threads.

The object may no longer be used after shutdown is called.

submitJob(job, background=False, precedence=0, timeout=30)[source]

Submit a job to a Gearman server.

Submits the provided job to the next server in this client’s round-robin connection pool.

If the job is a foreground job, updates will be made to the supplied Job object as they are received.

Parameters:
  • job (Job) – The Job to submit.
  • background (bool) – Whether the job should be backgrounded.
  • precedence (int) – Whether the job should have normal, low, or high precedence. One of PRECEDENCE_NORMAL, PRECEDENCE_LOW, or PRECEDENCE_HIGH
  • timeout (int) – How long to wait (in seconds) for a response from the server before giving up (default: 30 seconds).
Raises ConfigurationError:
 

If an invalid precendence value is supplied.

waitForServer()

Wait for at least one server to be connected.

Block until at least one gearman server is connected.

Job Objects

class gear.Job(name, arguments, unique=None)[source]

A job to run or being run by Gearman.

Parameters:
  • name (str) – The name of the job.
  • arguments (bytes) – The opaque data blob to be passed to the worker as arguments.
  • unique (str) – A byte string to uniquely identify the job to Gearman (optional).

The following instance attributes are available:

name (str)
The name of the job.
arguments (bytes)
The opaque data blob passed to the worker as arguments.
unique (str or None)
The unique ID of the job (if supplied).
handle (bytes or None)
The Gearman job handle. None if no job handle has been received yet.
data (list of byte-arrays)
The result data returned from Gearman. Each packet appends an element to the list. Depending on the nature of the data, the elements may need to be concatenated before use.
exception (bytes or None)
Exception information returned from Gearman. None if no exception has been received.
warning (bool)
Whether the worker has reported a warning.
complete (bool)
Whether the job is complete.
failure (bool)
Whether the job has failed. Only set when complete is True.
numerator (bytes or None)
The numerator of the completion ratio reported by the worker. Only set when a status update is sent by the worker.
denominator (bytes or None)
The denominator of the completion ratio reported by the worker. Only set when a status update is sent by the worker.
fraction_complete (float or None)
The fractional complete ratio reported by the worker. Only set when a status update is sent by the worker.
known (bool or None)
Whether the job is known to Gearman. Only set by handleStatusRes() in response to a getStatus() query.
running (bool or None)
Whether the job is running. Only set by handleStatusRes() in response to a getStatus() query.
connection (Connection or None)
The connection associated with the job. Only set after the job has been submitted to a Gearman server.

Worker Usage

The classes in this section should be all that are needed in order to implement a Gearman worker.

Worker Objects

class gear.Worker(worker_id)[source]

A Gearman worker.

Parameters:worker_id (str) – The worker ID to provide to Gearman (will appear in administrative command output).
addServer(host, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None)

Add a server to the client’s connection pool.

Any number of Gearman servers may be added to a client. The client will connect to all of them and send jobs to them in a round-robin fashion. When servers are disconnected, the client will automatically remove them from the pool, continuously try to reconnect to them, and return them to the pool when reconnected. New servers may be added at any time.

This is a non-blocking call that will return regardless of whether the initial connection succeeded. If you need to ensure that a connection is ready before proceeding, see waitForServer().

When using SSL connections, all SSL files must be specified.

Parameters:
  • host (str) – The hostname or IP address of the server.
  • port (int) – The port on which the gearman server is listening.
  • ssl_key (str) – Path to the SSL private key.
  • ssl_cert (str) – Path to the SSL certificate.
  • ssl_ca (str) – Path to the CA certificate.
Raises ConfigurationError:
 

If the host/port combination has already been added to the client.

broadcast(packet)

Send a packet to all currently connected servers.

Parameters:packet (Packet) – The Packet to send.
getConnection()

Return a connected server.

Finds the next scheduled connected server in the round-robin rotation and returns it. It is not usually necessary to use this method external to the library, as more consumer-oriented methods such as submitJob already use it internally, but is available nonetheless if necessary.

Returns:The next scheduled Connection object.
Return type:Connection
Raises NoConnectedServersError:
 If there are not currently connected servers.
getJob()[source]

Get a job from Gearman.

Blocks until a job is received. This method is re-entrant, so it is safe to call this method on a single worker from multiple threads. In that case, one of them at random will receive the job assignment.

Returns:The WorkerJob assigned.
Return type:WorkerJob.
Raises InterruptedError:
 If interrupted (by stopWaitingForJobs()) before a job is received.
handleAdminRequest(request)

Handle an administrative command response from Gearman.

This method is called whenever a response to a previously issued administrative command is received from one of this client’s connections. It normally releases the wait lock on the initiating AdminRequest object.

Parameters:request (AdminRequest) – The AdminRequest that initiated the received response.
handleEchoRes(packet)

Handle an ECHO_RES packet.

Causes the blocking Connection.echo() invocation to return.

Parameters:packet (Packet) – The Packet that was received.
Returns:None
handleError(packet)

Handle an ERROR packet.

Logs the error.

Parameters:packet (Packet) – The Packet that was received.
Returns:None
handleJobAssign(packet)[source]

Handle a JOB_ASSIGN packet.

Adds a WorkerJob to the internal queue to be picked up by any threads waiting in getJob().

Parameters:packet (Packet) – The Packet that was received.
handleJobAssignUnique(packet)[source]

Handle a JOB_ASSIGN_UNIQ packet.

Adds a WorkerJob to the internal queue to be picked up by any threads waiting in getJob().

Parameters:packet (Packet) – The Packet that was received.
handleNoJob(packet)[source]

Handle a NO_JOB packet.

Sends a PRE_SLEEP packet on the same connection.

Parameters:packet (Packet) – The Packet that was received.
handleNoop(packet)[source]

Handle a NOOP packet.

Sends a GRAB_JOB_UNIQ packet on the same connection. GRAB_JOB_UNIQ will return jobs regardless of whether they have been specified with a unique identifier when submitted. If they were not, then WorkerJob.unique attribute will be None.

Parameters:packet (Packet) – The Packet that was received.
handlePacket(packet)

Handle a received packet.

This method is called whenever a packet is received from any connection. It normally calls the handle method appropriate for the specific packet.

Parameters:packet (Packet) – The Packet that was received.
registerFunction(name, timeout=None)[source]

Register a function with Gearman.

If a timeout value is supplied, the function will be registered with CAN_DO_TIMEOUT.

Parameters:
  • name (str) – The name of the function to register.
  • timeout (numeric) – The timeout value (optional).
sendPacket(packet, connection)

Send a packet to a single connection, removing it from the list of active connections if that fails.

Parameters:
  • packet (Packet) – The Packet to send.
  • connection (Connection) – The Connection on which to send the packet.
setFunctions(functions)[source]

Replace the set of functions registered with Gearman.

Accepts a list of FunctionRecord objects which represents the complete set of functions that should be registered with Gearman. Any existing functions will be unregistered and these registered in their place. If the empty list is supplied, then the Gearman registered function set will be cleared.

Parameters:functions (list) – A list of FunctionRecord objects.
shutdown()

Close all connections and stop all running threads.

The object may no longer be used after shutdown is called.

stopWaitingForJobs()[source]

Interrupts all running getJob() calls, which will raise an exception.

unRegisterFunction(name)[source]

Remove a function from Gearman’s registry.

Parameters:name (str) – The name of the function to remove.
waitForServer()

Wait for at least one server to be connected.

Block until at least one gearman server is connected.

FunctionRecord Objects

class gear.FunctionRecord(name, timeout=None)[source]

Represents a function that should be registered with Gearman.

This class only directly needs to be instatiated for use with Worker.setFunctions(). If a timeout value is supplied, the function will be registered with CAN_DO_TIMEOUT.

Parameters:
  • name (str) – The name of the function to register.
  • timeout (numeric) – The timeout value (optional).

WorkerJob Objects

class gear.WorkerJob(handle, name, arguments, unique=None)[source]

A job that Gearman has assigned to a Worker. Not intended to be instantiated directly, but rather returned by Worker.getJob().

Parameters:
  • handle (str) – The job handle assigned by gearman.
  • name (str) – The name of the job.
  • arguments (bytes) – The opaque data blob passed to the worker as arguments.
  • unique (str) – A byte string to uniquely identify the job to Gearman (optional).

The following instance attributes are available:

name (str)
The name of the job.
arguments (bytes)
The opaque data blob passed to the worker as arguments.
unique (str or None)
The unique ID of the job (if supplied).
handle (bytes)
The Gearman job handle.
connection (Connection or None)
The connection associated with the job. Only set after the job has been submitted to a Gearman server.
sendWorkComplete(data='')[source]

Send a WORK_COMPLETE packet to the client.

Parameters:data (bytes) – The data to be sent to the client (optional).
sendWorkData(data='')[source]

Send a WORK_DATA packet to the client.

Parameters:data (bytes) – The data to be sent to the client (optional).
sendWorkException(data='')[source]

Send a WORK_EXCEPTION packet to the client.

Parameters:data (bytes) – The exception data to be sent to the client (optional).
sendWorkFail()[source]

Send a WORK_FAIL packet to the client.

sendWorkStatus(numerator, denominator)[source]

Send a WORK_STATUS packet to the client.

Sends a numerator and denominator that together represent the fraction complete of the job.

Parameters:
  • numerator (numeric) – The numerator of the fraction complete.
  • denominator (numeric) – The denominator of the fraction complete.
sendWorkWarning(data='')[source]

Send a WORK_WARNING packet to the client.

Parameters:data (bytes) – The data to be sent to the client (optional).

Administrative Protocol

Gearman provides an administrative protocol that is multiplexed on the same connection as the normal binary protocol for jobs. The classes in this section are useful for working with that protocol. They need to be used with an existing Connection object; either one obtained via a Client or Worker, or via direct instantiation of Connection to a Gearman server.

AdminRequest Objects

class gear.AdminRequest(*arguments)[source]

Encapsulates a request (and response) sent over the administrative protocol. This is a base class that may not be instantiated dircectly; a subclass implementing a specific command must be used instead.

Parameters:arguments (list) – A list of byte string arguments for the command.

The following instance attributes are available:

response (bytes)
The response from the server.
arguments (bytes)
The argument supplied with the constructor.
command (bytes)
The administrative command.
class gear.StatusAdminRequest[source]

A “status” administrative request.

The response from gearman may be found in the response attribute.

class gear.ShowJobsAdminRequest[source]

A “show jobs” administrative request.

The response from gearman may be found in the response attribute.

class gear.ShowUniqueJobsAdminRequest[source]

A “show unique jobs” administrative request.

The response from gearman may be found in the response attribute.

class gear.CancelJobAdminRequest(handle)[source]

A “cancel job” administrative request.

Parameters:handle (str) – The job handle to be canceled.

The response from gearman may be found in the response attribute.

class gear.VersionAdminRequest[source]

A “version” administrative request.

The response from gearman may be found in the response attribute.

Server Usage

A simple Gearman server is provided for convenience in unit testing, but is not designed for production use at scale. It takes no parameters other than the port number on which to listen.

Server Objects

class gear.Server(port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None)[source]

A simple gearman server implementation for testing (not for production use).

Parameters:port (int) – The TCP port on which to listen.
getQueue()[source]

Returns a copy of all internal queues in a flattened form.

Returns:The Gearman queue.
Return type:list of WorkerJob.
handlePacket(packet)

Handle a received packet.

This method is called whenever a packet is received from any connection. It normally calls the handle method appropriate for the specific packet.

Parameters:packet (Packet) – The Packet that was received.
shutdown()

Close all connections and stop all running threads.

The object may no longer be used after shutdown is called.

Common

These classes do not normally need to be directly instatiated to use the gear API, but they may be returned or otherwise be accessible from other classes in this module. They generally operate at a lower level, but still form part of the public API.

Connection Objects

class gear.Connection(host, port, ssl_key=None, ssl_cert=None, ssl_ca=None)[source]

A Connection to a Gearman Server.

connect()[source]

Open a connection to the server.

Raises ConnectionError:
 If unable to open the socket.
disconnect()[source]

Disconnect from the server and remove all associated state data.

echo(data=None, timeout=30)[source]

Perform an echo test on the server.

This method waits until the echo response has been received or the timeout has been reached.

Parameters:
  • data (bytes) – The data to request be echoed. If None, a random unique byte string will be generated.
  • timeout (numeric) – Number of seconds to wait until the response is received. If None, wait forever (default: 30 seconds).
Raises TimeoutError:
 

If the timeout is reached before the response is received.

readPacket()[source]

Read one packet or administrative response from the server.

Blocks until the complete packet or response is read.

Returns:The Packet or AdminRequest read.
Return type:Packet or AdminRequest
reconnect()[source]

Disconnect from and reconnect to the server, removing all associated state data.

sendAdminRequest(request, timeout=90)[source]

Send an administrative request to the server.

Parameters:
  • request (AdminRequest) – The AdminRequest to send.
  • timeout (numeric) – Number of seconds to wait until the response is received. If None, wait forever (default: 90 seconds).
Raises TimeoutError:
 

If the timeout is reached before the response is received.

sendPacket(packet)[source]

Send a packet to the server.

Parameters:packet (Packet) – The Packet to send.

Packet Objects

class gear.Packet(code, ptype, data, connection=None)[source]

A data packet received from or to be sent over a Connection.

Parameters:
  • code (bytes) – The Gearman magic code (constants.REQ or constants.RES)
  • ptype (bytes) – The packet type (one of the packet types in constasts).
  • data (bytes) – The data portion of the packet.
  • connection (Connection) – The connection on which the packet was received (optional).
Raises InvalidDataError:
 

If the magic code is unknown.

getArgument(index, last=False)[source]

Get the nth argument from the packet data.

Parameters:
  • index (int) – The argument index to look up.
  • last (bool) – Whether this is the last argument (and thus nulls should be ignored)
Returns:

The argument value.

Return type:

bytes

getJob()[source]

Get the Job associated with the job handle in this packet.

Returns:The Job for this packet.
Return type:Job
Raises UnknownJobError:
 If the job is not known.
toBinary()[source]

Return a Gearman wire protocol binary representation of the packet.

Returns:The packet in binary form.
Return type:bytes

Exceptions

exception gear.ConnectionError[source]
exception gear.InvalidDataError[source]
exception gear.ConfigurationError[source]
exception gear.NoConnectedServersError[source]
exception gear.UnknownJobError[source]
exception gear.InterruptedError[source]

Constants

These constants are used by public API classes.

gear.PRECEDENCE_NORMAL

Normal job precedence.

gear.PRECEDENCE_LOW

Low job precedence.

gear.PRECEDENCE_HIGH

High job precedence.

Protocol Constants

These are not necessary for normal API usage. See the Gearman protocol reference for an explanation of each of these.

Magic Codes

gear.constants.REQ

The Gearman magic code for a request.

gear.constants.RES

The Gearman magic code for a response.

Packet Types

gear.constants.CAN_DO
gear.constants.CANT_DO
gear.constants.RESET_ABILITIES
gear.constants.PRE_SLEEP
gear.constants.NOOP
gear.constants.SUBMIT_JOB
gear.constants.JOB_CREATED
gear.constants.GRAB_JOB
gear.constants.NO_JOB
gear.constants.JOB_ASSIGN
gear.constants.WORK_STATUS
gear.constants.WORK_COMPLETE
gear.constants.WORK_FAIL
gear.constants.GET_STATUS
gear.constants.ECHO_REQ
gear.constants.ECHO_RES
gear.constants.SUBMIT_JOB_BG
gear.constants.ERROR
gear.constants.STATUS_RES
gear.constants.SUBMIT_JOB_HIGH
gear.constants.SET_CLIENT_ID
gear.constants.CAN_DO_TIMEOUT
gear.constants.ALL_YOURS
gear.constants.WORK_EXCEPTION
gear.constants.OPTION_REQ
gear.constants.OPTION_RES
gear.constants.WORK_DATA
gear.constants.WORK_WARNING
gear.constants.GRAB_JOB_UNIQ
gear.constants.JOB_ASSIGN_UNIQ
gear.constants.SUBMIT_JOB_HIGH_BG
gear.constants.SUBMIT_JOB_LOW
gear.constants.SUBMIT_JOB_LOW_BG
gear.constants.SUBMIT_JOB_SCHED
gear.constants.SUBMIT_JOB_EPOCH

Indices and tables