1 // Copyright (c) 2013, Peter Wood.
2 // See license.txt for licensing details.
3 module stalkd.job;
4 
5 import std.conv;
6 import std.outbuffer;
7 import stalkd.connection;
8 import stalkd.exceptions;
9 import stalkd.tube;
10 
11 /**
12  * This class models a Beanstalkd job. At it's lowest level a Job is really just
13  * a collection of bytes but this function provides some addition functionality
14  * for interacting with jobs.
15  */
16 class Job {
17    /**
18     * The default priority given to jobs.
19     */
20    static const DEFAULT_PRIORITY = 0;
21 
22    /**
23     * The default delay applied to jobs.
24     */
25    static const DEFAULT_DELAY = 0;
26 
27    /**
28     * The default time to run applied to jobs.
29     */
30    static const DEFAULT_TIME_TO_RUN = 180;
31 
32    /**
33     * Default constructor for the Job class.
34     */
35    this() {
36       _buffer = new OutBuffer;
37    }
38 
39    /**
40     * Constructor for the Job class that allows it to handle strings of any
41     * type.
42     *
43     * Params:
44     *    data =  A string (or wstring or dstring) containing the job data.
45     */
46    this(T)(T[] data) {
47       _buffer = new OutBuffer;
48       append(data);
49    }
50 
51    /**
52     * Getter for the job id property. Note that this property will only be valid
53     * for jobs that have been added to Beanstalk or extract from Beanstalk. If
54     * called on a Job that doesn't meet these criteria an exception will be
55     * thrown.
56     */
57    const @property uint id() {
58       if(_tube is null) {
59          throw(new StalkdException("Job doesn't yet possess a Beanstald id."));
60       }
61 
62       return(_id);
63    }
64 
65    /**
66     * Setter for the job id property (visible only in package).
67     */
68    @property package void id(uint id) {
69       _id = id;
70    }
71 
72    /**
73     * This function returns the Tube that a job was either put on or reserved
74     * from. If the Job hasn't been put, reserved or peeked then this will be
75     * null.
76     */
77    @property Tube tube() {
78       return(_tube);
79    }
80 
81    /**
82     * Tube property setter, only accessible within the package.
83     */
84    @property package void tube(Tube tube) {
85       _tube = tube;
86    }
87 
88    /**
89     * This function provides package level access to the data associated with
90     * a Job object.
91     *
92     * Returns:  An array of ubytes containing the Job data.
93     */
94    @property ubyte[] data() {
95       return(_buffer.toBytes());
96    }
97 
98    /**
99     * This function appends an array of ubytes to the data stored within a Job
100     * object.
101     *
102     * Params:
103     *    data =  The array of data to be written to the Job.
104     */
105    package void write(ubyte[] data) {
106       if(data.length > 0) {
107          _buffer.reserve(data.length);
108          _buffer.write(data);
109       }
110    }
111 
112    /**
113     * This function appends a string (or wstring or dstring) to the data stored
114     * within a Job.
115     *
116     * Params
117     *    data =  The string to be written into the Job.
118     */
119    void append(T)(T[] data) {
120       write(cast(ubyte[])data);
121    }
122 
123    /**
124     * Fetches the body of the job, converting it to a string in the process.
125     */
126    string bodyAsString() {
127       return(to!string(cast(char[])this.data));
128    }
129 
130    /**
131     * Fetches the body of the job, converting it to a dstring in the process.
132     */
133    dstring bodyAsDString() {
134       return(to!dstring(cast(dchar[])this.data));
135    }
136 
137    /**
138     * Fetches the body of the job, converting it to a wstring in the process.
139     */
140    wstring bodyAsWString() {
141       return(to!wstring(cast(wchar[])this.data));
142    }
143 
144    /**
145     * This function deletes a job from Beanstalk. Note that this function can
146     * only be called on Jobs that have a been put into or reserved out of a
147     * Beanstalk server.
148     */
149    void destroy() {
150       if(_tube is null) {
151          throw(new StalkdException("Job is not associated with a tube and cannot be deleted."));
152       }
153       _tube.deleteJob(_id);
154    }
155 
156    /**
157     * This function releases a job back to Beanstalk. Note that only reserved
158     * jobs can be released.
159     *
160     * Params:
161     *    delay =     The delay to be applied to the job as part of it's release.
162     *                This defaults to DEFAULT_DELAY.
163     *    priority =  The priority to be given to the job as it is released. This
164     *                defaults to DEFAULT_PRIORITY.
165     */
166    void release(uint delay=DEFAULT_DELAY, uint priority=DEFAULT_PRIORITY) {
167       if(_tube is null) {
168          throw(new StalkdException("Job is not associated with a tube and cannot be released."));
169       }
170       _tube.releaseJob(_id, delay, priority);
171    }
172 
173    /**
174     * This function buries a job on Beanstalk. Note that only reserved jobs can
175     * be buried.
176     *
177     * Params:
178     *    priority =  The priority to be assigned to the buried job. Defaults to
179     *                DEFAULT_PRIORITY.
180     */
181    void bury(uint priority= DEFAULT_PRIORITY) {
182       if(_tube is null) {
183          throw(new StalkdException("Job is not associated with a tube and cannot be buried."));
184       }
185       _tube.buryJob(_id, priority);
186    }
187 
188    /**
189     * This function touchs a job on Beanstalk, extending its time to run. Note
190     * that only reserved jobs can be touched.
191     */
192    void touch() {
193       if(_tube is null) {
194          throw(new StalkdException("Job is not associated with a tube and cannot be touched."));
195       }
196       _tube.touchJob(_id);
197    }
198 
199    private uint      _id;
200    private OutBuffer _buffer;
201    private Tube      _tube;
202 }
203 
204 //------------------------------------------------------------------------------
205 // Unit Tests
206 //------------------------------------------------------------------------------
207 /*
208  * NOTE: There is a limit to the amount of unit testing that can be performed
209  *       without an actual server connection. For this reason, the test below
210  *       check for the presence of an available test Beanstalkd instance via
211  *       the existence of the BEANSTALKD_TEST_HOST environment variable. If
212  *       this is set then an attempt will be made to connect to it to perform
213  *       an additional series of tests. You can specify the port for this test
214  *       server using the BEANSTALKD_TEST_PORT environment variable. As the
215  *       queues on this server will be added to, deleted from and cleared of
216  *       content as part of the tests this server should not be used for any
217  *       other purpose!
218  */
219 unittest {
220    import std.exception;
221    import std.process;
222    import std.stdio;
223    import std.string;
224    import stalkd;
225 
226    auto job = new Job("Test data.");
227 
228    void getId() {
229       job.id;
230    }
231    void callBury() {
232       job.bury();
233    }
234    void callDestroy() {
235       job.destroy();
236    }
237    void callRelease() {
238       job.release();
239    }
240    void callTouch() {
241       job.touch();
242    }
243 
244    // Test: Basic stuff.
245    assertThrown!StalkdException(getId);
246    assert(job.tube is null);
247    assert(job.data == "Test data.".representation);
248    assert(job.bodyAsString() == "Test data.");
249 
250    job.append(" Extra content.");
251    assert(job.data == "Test data. Extra content.".representation);
252    assert(job.bodyAsString() == "Test data. Extra content.");
253 
254    // Test: Operations when a tube hasn't been set.
255    assertThrown!StalkdException(callBury);
256    assertThrown!StalkdException(callDestroy);
257    assertThrown!StalkdException(callRelease);
258    assertThrown!StalkdException(callTouch);
259 
260    auto host = environment.get("BEANSTALKD_TEST_HOST");
261    if(host !is null) {
262       writeln("The BEANSTALKD_TEST_HOST environment variable is set, conducting advanced tests for the Job class.");
263       ushort port = Server.DEFAULT_BEANSTALKD_PORT;
264       if(environment.get("BEANSTALKD_TEST_PORT") !is null) {
265          port = to!ushort(environment.get("BEANSTALKD_TEST_PORT"));
266       }
267       auto connection = new Connection(host, port);
268       auto tube       = new Tube(connection);
269 
270       // Clear any existing content from the tube before starting.
271       while((job = tube.peek()) !is null) {
272          tube.deleteJob(job.id);
273       }
274       while((job = tube.peekBuried()) !is null) {
275          tube.deleteJob(job.id);
276       }
277 
278       // Test: Job id is accessible.
279       job = new Job("Example test job.");
280       tube.put(job);
281       assertNotThrown!StalkdException(getId);
282 
283       // Test: Releasing a job.
284       job = tube.reserve(3).get();
285       assert(job !is null);
286       assertNotThrown!StalkdException(callRelease);
287       assert(tube.peek() !is null);
288 
289       // Test: Burying a job.
290       job = tube.reserve(3).get();
291       assert(job !is null);
292       assertNotThrown!StalkdException(callBury);
293       assert(tube.peek() is null);
294       assert(tube.peekBuried() !is null);
295 
296       // Test: Destroying a job.
297       assert(tube.kick() == 1);
298       assert(tube.peek() !is null);
299       job = tube.reserve();
300       assert(job !is null);
301       assertNotThrown!StalkdException(callDestroy);
302       assert(tube.peek() is null);
303    } else {
304       writeln("The BEANSTALKD_TEST_HOST environment variable is not set, advanced tests for the Job class skipped.");
305    }
306 }