1 // Copyright (c) 2013, Peter Wood.
2 // See license.txt for licensing details.
3 module stalkd.tube;
4 
5 import std.algorithm;
6 import std.conv;
7 import std.outbuffer;
8 import std.socket;
9 import std.string;
10 import std.typecons : Nullable;
11 import stalkd.connection;
12 import stalkd.exceptions;
13 import stalkd.job;
14 import stalkd.server;
15 
16 /**
17  * This class models a tube within a Beanstalkd instance. There are two concepts
18  * associated with Tubes - watching and using. When you use a tube you alter the
19  * target tube that new jobs get added to. When you watch a tube you are
20  * indicating that you are interested in the jobs that have been added to it.
21  * You can only use a single tube at any one time but you can watch multiple
22  * tubes simultaneously.
23  *
24  * Note that the Connection object associated with a Tube instance should not
25  * be shared with any other tubes. For this reason it's probably best practice
26  * to create Tube's directly using the constructor that takes a Server object
27  * as that will guarantee a new Connection for the Tube.
28  */
29 class Tube {
30    /**
31     * The name of the default tube if an explicit tube is not used.
32     */
33    static const DEFAULT_TUBE_NAME = "default";
34 
35    /**
36     * The maximum length permitted for a tube name.
37     */
38    static const MAX_TUBE_NAME_LEN = 200;
39 
40    /**
41     * Constructor for the Tube class that creates a Tube object using the
42     * 'default' tube on the server.
43     *
44     * Params:
45     *    connection =  The Connection object for the server.
46     */
47    this(Connection connection) {
48       _connection      = connection;
49       _using           = DEFAULT_TUBE_NAME;
50       _watching.length = 1;
51       _watching[0]     = DEFAULT_TUBE_NAME;
52    }
53 
54    /**
55     * Constructor for the Tube class that creates a Tube object using the
56     * 'default' tube on the server. Use this method in preference to creating
57     * a Tube using a Connection object as this guarantees a Connection dedicated
58     * to the Tube.
59     *
60     * Params:
61     *    server =  The Beanstalk server to create the Tube for.
62     */
63    this(Server server) {
64       this(new Connection(server));
65    }
66 
67    /**
68     * Getter for the connection property.
69     */
70    @property Connection connection() {
71       return(_connection);
72    }
73 
74    /**
75     * This function retrieves a string containing the name of the tube that
76     * the Tube object is currently using.
77     */
78    @property string using() {
79       return(_using);
80    }
81 
82    /**
83     * This function is simply an alias for a call to the use() function.
84     */
85    @property void using(string name) {
86       use(name);
87    }
88 
89    /**
90     * This function returns a list of the name for the tubes that the Tube
91     * object is currently watching.
92     */
93    @property string[] watching() {
94       return(_watching.dup);
95    }
96 
97    /**
98     * This function alters the server tube that a Tube object uses.
99     *
100     * Params:
101     *    name =  The name of the tube to be used. Note that this string has to
102     *            conform to Beanstalkd tube naming rules.
103     */
104    void use(string name) {
105       if(name is null) {
106          throw(new StalkdException("Tube name not specified."));
107       } else if(name.length > MAX_TUBE_NAME_LEN) {
108          throw(new StalkdException("Tube name too long."));
109       }
110       send(null, "use", name);
111 
112       auto response = receive();
113       if(!response.startsWith("USING")) {
114          response = response.chomp();
115          throw(new StalkdException(to!string("Server responded with a " ~ response ~ " error.")));
116       }
117       _using = name;
118    }
119 
120    /**
121     * This function adds to the server tubes that a Tube object watches.
122     *
123     * Params:
124     *    names =  An array of strings containing the names of the tubes to be
125     *             watched. Invalid tube names will be completely ignored.
126     */
127    void watch(string[] names...) {
128       foreach(name; names) {
129          if(find(_watching, name).empty) {
130             send(null, "watch", name);
131             auto response = receive();
132             if(!response.startsWith("WATCHING")) {
133                response = response.chomp();
134                throw(new StalkdException(to!string("Server responded with a " ~ response ~ " error.")));
135             }
136             _watching ~= name;
137          }
138       }
139    }
140 
141    /**
142     * This function removes one or more names from the server tubes that a Tube
143     * object watches.
144     *
145     * Params:
146     *    names =  An array of strings containing the names of the tubes to be
147     *             ignored. Invalid tube names will be completely ignored.
148     */
149    void ignore(string[] names...){
150       foreach(name; names) {
151          if(!find(_watching, name).empty) {
152             send(null, "ignore", name);
153             auto response = receive();
154             if(!response.startsWith("WATCHING")) {
155                response = response.chomp();
156                throw(new StalkdException(to!string("Server responded with a " ~ response ~ " error.")));
157             }
158 
159             string[] remaining;
160             foreach(entry; _watching) {
161                if(entry != name) {
162                   remaining ~= entry;
163                }
164             }
165             _watching = remaining;
166          }
167       }
168    }
169 
170    /**
171     * This function adds a new job to the tube that is currently being used.
172     *
173     * Params:
174     *    job =        A reference to the job to be added. Upin successful
175     *                 addition of the job the objects id will be updated to
176     *                 reflect the id given to it by Beanstalk.
177     *    delay =      The delay to be assigned to the new job. Defaults to
178     *                 Job.DEFAULT_DELAY.
179     *    priority =   The priority to be allocated to the new job. Defaults to
180     *                 Job.DEFAULT_PRIORITY.
181     *    timeToRun =  The time to run to be allocated to the new job. Defaults
182     *                 to Job.DEFAULT_TIME_TO_RUN.
183     */
184    void put(ref Job job, uint delay=Job.DEFAULT_DELAY, uint priority=Job.DEFAULT_PRIORITY, uint timeToRun=Job.DEFAULT_TIME_TO_RUN) {
185       uint jobId;
186       auto data = job.data;
187    
188       send(data, "put", priority, delay, timeToRun, data.length);
189 
190       auto response = receive();
191       auto offset   = std..string.indexOf(response, " ");
192       if(offset != -1) {
193          jobId    = to!uint(response[++offset..$]);
194          response = response[0..offset].stripRight();
195       }
196 
197       if(response != "INSERTED") {
198          StalkdException exception;
199 
200          if(response == "BURIED") {
201              throw(new StalkdException(to!string("Server had insufficient memory to grow the priority queue. Job id " ~ to!string(jobId) ~ " was buried.")));
202          } else if(response == "JOB_TOO_BIG") {
203             throw(new StalkdException("Job is too big."));
204          } else if(response == "DRAINING") {
205             throw(new StalkdException("Server is not accepting new jobs at this time."));
206          } else if(response == "EXPECTED_CRLF") {
207             throw(new StalkdException("Internal message structure error."));
208          } else {
209             throw(new StalkdException(to!string("1. Server returned a " ~ response ~ " error.")));
210          }
211       } else {
212          job.id   = jobId;
213          job.tube = this;
214       }
215    }
216 
217    /**
218     * A blocking implementation of the reserve() method that will not return
219     * until such time as a Job is available or an exception occurs.
220     */
221    Job reserve() {
222       return(reserve(0).get());
223    }
224 
225    /**
226     * This function attempts to reserve a job from one of the tubes that a Tube
227     * object is currently watching. Note that the return type for the function
228     * is a Nullable!Job. This value will test as null if a Job did not become
229     * available before the time out.
230     *
231     * Params:
232     *    timeOut = The maximum number of seconds for the server to wait for a
233     *              job to become available. If no job is available then the
234     *              function will return null. If set to zero  the function will
235     *              block indefinitely (i.e. it won't time out).
236     */
237    Nullable!Job reserve(uint timeOut) {
238       Nullable!Job output;
239       char[]       response = new char[100];
240 
241       if(timeOut > 0) {
242          send(null, "reserve-with-timeout", timeOut);
243       } else {
244          send(null, "reserve");
245       }
246 
247       auto total = _connection.socket.receive(response);
248       if(total == Socket.ERROR) {
249          throw(new StalkdException("Error reading from server connection."));
250       } else if(total == 0) {
251          throw(new StalkdException("Connection to server unexpectedly terminated."));
252       }
253       response = response[0..total];
254 
255       if(response.startsWith("RESERVED")) {
256          uint      jobId;
257          ulong     read,
258                    size;
259          size_t[]  offsets = [0, 0, 0];
260          OutBuffer buffer;
261 
262          offsets[0] = std..string.indexOf(response, " ");
263          offsets[1] = std..string.indexOf(response, " ", (offsets[0] + 1));
264          offsets[2] = std..string.indexOf(response, "\r\n", (offsets[1] + 1));
265          if(!offsets.find(-1).empty) {
266             throw(new StalkdException("Unrecognised response received from server."));
267          }
268 
269          jobId  = to!uint(response[(offsets[0] + 1)..offsets[1]]);
270          size   = to!uint(response[(offsets[1] + 1)..offsets[2]]);
271          read   = response.length - (offsets[2] + 2);
272          buffer = new OutBuffer;
273          buffer.reserve(cast(uint)size);
274 
275          if(read > 0) {
276             auto endPoint  = response.length,
277                  available = endPoint - (offsets[2] + 2);
278 
279             while(available > size) {
280                endPoint--;
281                available = endPoint - (offsets[2] + 2);
282             }
283 
284             buffer.write(response[(offsets[2] + 2)..endPoint]);
285          }
286          if(size > read) {
287             readInJobData(buffer, cast(uint)(size - read));
288          }
289 
290          auto job = new Job;
291          job.id   = jobId;
292          job.tube = this;
293          job.write(buffer.toBytes());
294          output   = job;
295       } else if(!response.startsWith("TIMED_OUT")) {
296          response.chomp();
297          throw(new StalkdException(to!string("2. Server returned a " ~ response ~ " error.")));
298       }
299 
300       return(output);
301    }
302 
303    /**
304     * This function attempts to kick buried jobs. If there are buried jobs then
305     * Beanstalk will return them to a ready state. Failing that, if there are
306     * any delayed jobs they will be kicked instead.
307     *
308     * Params:
309     *    maximum =  The maximum number of jobs to kick. Defaults to 1.
310     */
311    public uint kick(uint maximum=1) {
312       uint total;
313 
314       send(null, "kick", maximum);
315       auto response = receive();
316       if(response.startsWith("KICKED")) {
317          total = to!uint(response[(std..string.indexOf(response, " ") + 1)..$]);
318       } else {
319          throw(new StalkdException(to!string("Server responded with a " ~ response ~ " error.")));
320       }
321       return(total);
322    }
323 
324    /**
325     * This function kicks a specific job if it is sitting in the buried or
326     * delayed queues.
327     */
328    public void kickJob(uint jobId) {
329       send(null, "kick-job", jobId);
330       auto response = receive();
331       if(response != "KICKED") {
332          throw(new StalkdException(to!string("Server responded with a " ~ response ~ " error.")));
333       }
334    }
335 
336    /**
337     * This function 'peeks' at the Beanstalk ready queue to see if there is a
338     * job available. If there is a job is is returned. Note that peeking does
339     * not reserve the job returned.
340     *
341     * Returns:  A Job if one is available, null otherwise.
342     */
343    public Job peek() {
344       return(peekFor("ready"));
345    }
346 
347    /**
348     * This function 'peeks' at the Beanstalk delayed queue to see if there is a
349     * job available. If there is a job is is returned. Note that peeking does
350     * not reserve the job returned.
351     *
352     * Returns:  A Job if one is available, null otherwise.
353     */
354    public Job peekDelayed() {
355       return(peekFor("delayed"));
356    }
357 
358    /**
359     * This function 'peeks' at the Beanstalk buried queue to see if there is a
360     * job available. If there is a job is is returned. Note that peeking does
361     * not reserve the job returned.
362     *
363     * Returns:  A Job if one is available, null otherwise.
364     */
365    public Job peekBuried() {
366       return(peekFor("buried"));
367    }
368 
369    /**
370     * This function peeks at Beanstalks contents to see if a job with a given
371     * id exists. If it does it is returned. Note that peeking does not reserve
372     * the job returned.
373     *
374     * Returns:  A Job if the job exists, null otherwise.
375     *
376     * Params:
377     *    jobId =  The unique identifier of the job to peek for.
378     */
379    public Job peekForId(uint jobId) {
380       return(doPeek(to!string("peek " ~ to!string(jobId))));
381    }
382 
383    /**
384     * This function deletes a specific job from Beanstalk. Note that you must
385     * have reserved the job before you can delete it.
386     *
387     * Params:
388     *    jobId =  The unique identifier of the job to delete.
389     */
390    public void deleteJob(uint jobId) {
391       send(null, "delete", jobId);
392       auto response = receive();
393       if(response != "DELETED") {
394          throw(new StalkdException(to!string("3. Server returned a " ~ response ~ " error.")));
395       }
396    }
397 
398    /**
399     * This function releases a previously reserved job back to Beanstalk control. 
400     *
401     * Params:
402     *    jobId =  The unique identifier of the job to released.
403     *    delay =     The delay to be applied to the job when it is released
404     *                back to Beanstalk. Defaults to Job.DEFAULT_DELAY.
405     *    priority =  The priority to be applied to the job when it is released
406     *                back to Beanstalk. Defaults to Job.DEFAULT_PRIORITY.
407     */
408    public void releaseJob(uint jobId, uint delay=Job.DEFAULT_DELAY, uint priority=Job.DEFAULT_PRIORITY) {
409       send(null, "release", jobId, priority, delay);
410       auto response = receive();
411       if(response == "BURIED") {
412          throw(new StalkdException(to!string("Server had insufficient memory to grow its priority queue. Job id " ~ to!string(jobId) ~ " was buried.")));
413       } else if(response != "RELEASED") {
414          throw(new StalkdException(to!string("4. Server returned a " ~ response ~ " error.")));
415       }
416    }
417 
418    /**
419     * This function buries a specified job. Note that you must have first
420     * reserved the job before you can bury it.
421     *
422     * Params:
423     *    jobId =     The unique identifier of the job to bury.
424     *    priority =  The priority to assign to the job as part of burying it.
425     *                Defaults to Job.DEFAULT_PRIORITY.
426     */
427    public void buryJob(uint jobId, uint priority=Job.DEFAULT_PRIORITY) {
428       send(null, "bury", jobId, priority);
429       auto response = receive();
430       if(response != "BURIED") {
431          throw(new StalkdException(to!string("5. Server returned a " ~ response ~ " error.")));
432       }
433    }
434 
435    /**
436     * This function touches a job, extending its time to run on the server. Note
437     * that you must have first reserved the job before you can touch it.
438     *
439     * Params:
440     *    jobId =  The unique identifier of the job to touch.
441     */
442    public void touchJob(uint jobId) {
443       send(null, "touch", jobId);
444       auto response = receive();
445       if(response != "TOUCHED") {
446          throw(new StalkdException(to!string("6. Server returned a " ~ response ~ " error.")));
447       }
448    }
449 
450    /**
451     * This function is used internally by the class to dispatch requests to
452     * the Beanstalk server.
453     *
454     * Params:
455     *    data =       The data to be sent in the request. If null then no
456     *                 data is sent.
457     *    parameters = The parameters to be prefixed to the data being sent.
458     */
459    private void send(T...)(in ubyte[] data, T parameters) {
460       OutBuffer buffer = new OutBuffer;
461       string    request;
462       uint      index;
463 
464       foreach(parameter; parameters) {
465          if(index > 0) {
466             request ~= " ";
467          }
468          request ~= to!string(parameter);
469          index++;
470       }
471       request ~= "\r\n";
472       buffer.reserve(request.length + (data ? data.length : 0) + 2);
473       buffer.write(request);
474 
475       if(data !is null && data.length > 0) {
476          buffer.write(data);
477          buffer.write("\r\n");
478       }
479 
480       if(_connection.socket.send(buffer.toBytes()) == Socket.ERROR) {
481          throw(new StalkdException("Error sending data on server connection."));
482       }
483    }
484 
485    /**
486     * This function is used internally by the class wherever a simple answer is
487     * expected to a request.
488     *
489     * Returns:  A string containing the response value read. Note that trailing
490     *           whitespace on the response will have been removed.
491     */
492    private string receive() {
493       char[] response = new char[100];
494       auto   total    = _connection.socket.receive(response);
495 
496       if(total == Socket.ERROR) {
497          throw(new StalkdException("Error reading from server connection."));
498       } else if(total == 0) {
499          throw(new StalkdException("Connection to server unexpectedly terminated."));
500       }
501 
502       return(to!string(response[0..total]).chomp());
503    }
504 
505    /**
506     * This function us used internally by the class to read job data into an
507     * OutBuffer instance.
508     *
509     * Params:
510     *    buffer =    The buffer to place the bytes read into.
511     *    quantity =  The number of bytes of data to be read in.
512     */
513    private void readInJobData(ref OutBuffer buffer, uint quantity) {
514       ubyte[] data  = new ubyte[quantity + 2];
515       auto    total = _connection.socket.receive(data);
516 
517       if(total == Socket.ERROR) {
518          throw(new StalkdException("Error retrieving response from server."));
519       } else if(total == 0) {
520          throw(new StalkdException("Server connection closed unexpectedly."));
521       }
522       data = data[0..($ - 2)];
523       buffer.write(data);
524    }
525 
526    /**
527     * This function is used internally by the class to check for available jobs
528     * of a specified type.
529     *
530     * Params:
531     *    type =  A string that should be either "ready", "delayed" or "buried". 
532     */
533    private Job peekFor(string type) {
534       return(doPeek(to!string("peek-" ~ type)));
535    }
536 
537    /**
538     * This function performs a peek operation against the server.
539     *
540     * Params:
541     *    request =  The request to be sent to the server. 
542     */
543    private Job doPeek(string request) {
544       Job    job      = null;
545       char[] response = new char[100];
546 
547       send(null, request);
548 
549       auto total = _connection.socket.receive(response);
550       if(total == Socket.ERROR) {
551          throw(new StalkdException("Error reading from server connection."));
552       } else if(total == 0) {
553          throw(new StalkdException("Connection to server unexpectedly terminated."));
554       }
555       response = response[0..total].chomp();
556 
557       if(response.startsWith("FOUND")) {
558          uint      jobId;
559          ulong     size,
560                    read;
561          size_t[]  offsets = [0, 0, 0];
562          OutBuffer buffer;
563 
564          offsets[0] = std..string.indexOf(response, " ");
565          offsets[1] = std..string.indexOf(response, " ", (offsets[0] + 1));
566          offsets[2] = std..string.indexOf(response, "\r\n", (offsets[1] + 1));
567          if(!offsets.find(-1).empty) {
568             throw(new StalkdException("Unrecognised response received from server."));
569          }
570 
571          jobId     = to!uint(response[(offsets[0] + 1)..offsets[1]]);
572          size      = to!size_t(response[(offsets[1] + 1)..offsets[2]]);
573          read      = response.length - (offsets[2] + 2);
574          buffer = new OutBuffer;
575          buffer.reserve(cast(uint)size);
576 
577          if(read > 0) {
578             auto endPoint  = response.length,
579                  available = endPoint - (offsets[2] + 2);
580 
581             while(available > size) {
582                endPoint--;
583                available = endPoint - (offsets[2] + 2);
584             }
585 
586             buffer.write(response[(offsets[2] + 2)..$]);
587          }
588          if(size > read) {
589             readInJobData(buffer, cast(uint)(size - read));
590          }
591 
592          job      = new Job;
593          job.id   = jobId;
594          job.tube = this;
595          job.write(buffer.toBytes());
596       } else if(!response.startsWith("NOT_FOUND")) {
597          throw(new StalkdException(to!string("7. Server returned a " ~ response ~ " error.")));
598       }
599 
600       return(job);
601    }
602 
603    private Connection _connection;
604    private string     _using;
605    private string[]   _watching;
606 }
607 
608 //------------------------------------------------------------------------------
609 // Unit Tests
610 //------------------------------------------------------------------------------
611 /*
612  * NOTE: There is a limit to the amount of unit testing that can be performed
613  *       without an actual server connection. For this reason, the test below
614  *       check for the presence of an available test Beanstalkd instance via
615  *       the existence of the BEANSTALKD_TEST_HOST environment variable. If
616  *       this is set then an attempt will be made to connect to it to perform
617  *       an additional series of tests. You can specify the port for this test
618  *       server using the BEANSTALKD_TEST_PORT environment variable. As the
619  *       queues on this server will be added to, deleted from and cleared of
620  *       content as part of the tests this server should not be used for any
621  *       other purpose!
622  */
623 unittest {
624    import core.thread;
625    import core.time;
626    import std.stdio;
627    import std.conv;
628    import std.process;
629    import std.exception;
630    import stalkd;
631 
632    auto connection = new Connection("127.0.0.1");
633    auto tube       = new Tube(connection);   
634 
635    assert(tube.connection is connection);
636    assert(tube.using is Tube.DEFAULT_TUBE_NAME);
637    assert(tube.watching == [Tube.DEFAULT_TUBE_NAME]);  
638 
639    auto host = environment.get("BEANSTALKD_TEST_HOST");
640    if(host !is null) {
641       writeln("The BEANSTALKD_TEST_HOST environment variable is set, conducting advanced tests for the Tube class.");
642       ushort port = Server.DEFAULT_BEANSTALKD_PORT;
643       if(environment.get("BEANSTALKD_TEST_PORT") !is null) {
644          port = to!ushort(environment.get("BEANSTALKD_TEST_PORT"));
645       }
646       connection = new Connection(host, port);
647 
648       string tubeName = "alternative";
649       tube = new Tube(connection);
650 
651       void useTube() {
652          tube.use(tubeName);
653       }
654 
655       void watchTube() {
656          tube.watch(tubeName);
657       }
658 
659       void ignoreTube() {
660          tube.ignore(tubeName);
661       }
662 
663       // Test: Use a tube name.
664       assertNotThrown!StalkdException(useTube);
665       assert(tube.using is tubeName);
666       assert(tube.watching == [Tube.DEFAULT_TUBE_NAME]);
667 
668       // Test: Use can switch between tube names multiple times.
669       tubeName = Tube.DEFAULT_TUBE_NAME;
670       assertNotThrown!StalkdException(useTube);
671       assert(tube.using is tubeName);
672       assert(tube.watching == [Tube.DEFAULT_TUBE_NAME]);
673 
674       // Test: Watch a tube name.
675       tubeName = "alternative";
676       assertNotThrown!StalkdException(watchTube);
677       assert(tube.using is Tube.DEFAULT_TUBE_NAME);
678       assert(tube.watching == [Tube.DEFAULT_TUBE_NAME, tubeName]);
679 
680       // Test: Ignore a tube name.
681       assertNotThrown!StalkdException(ignoreTube);
682       assert(tube.using is Tube.DEFAULT_TUBE_NAME);
683       assert(tube.watching == [Tube.DEFAULT_TUBE_NAME]);
684 
685       // Test: The default tube name can be ignored.
686       assertNotThrown!StalkdException(watchTube);
687       tubeName = Tube.DEFAULT_TUBE_NAME;
688       assertNotThrown!StalkdException(ignoreTube);
689       assert(tube.using is Tube.DEFAULT_TUBE_NAME);
690       assert(tube.watching == ["alternative"]);
691 
692       // Test: You can't unwatch all tubes.
693       tubeName = "alternative";
694       assertThrown!StalkdException(ignoreTube);
695 
696       // Clear any existing content from the tube before starting.
697       Job job;
698       while((job = tube.peek()) !is null) {
699          tube.deleteJob(job.id);
700       }
701       while((job = tube.peekBuried()) !is null) {
702          tube.deleteJob(job.id);
703       }
704 
705       // Put a job into a tube.
706       job  = new Job("Job data.");
707       tube = new Tube(Server(host, port));
708       void putJob() {
709          tube.put(job);
710       }
711       assertNotThrown!StalkdException(putJob);
712 
713       // Test: Peek to see if the job is there.
714       void peekJob() {
715          job = tube.peek();
716       }
717       assertNotThrown!StalkdException(peekJob);
718       assert(job !is null);
719       assert(job.bodyAsString() == "Job data.");
720 
721       // Test: Reserve a job without timeout.
722       void reserveJob() {
723          job = tube.reserve();
724       }
725       assertNotThrown!StalkdException(reserveJob);
726       assert(job.bodyAsString() == "Job data.");
727 
728       // Test: Releasing a job.
729       void releaseJob() {
730          tube.releaseJob(job.id);
731       }
732       assertNotThrown!StalkdException(releaseJob);
733 
734       // Test: Reserve a job from a tube with timeout.
735       Nullable!Job reserved;
736       void reserveJobWithTimeOut() {
737          reserved = tube.reserve(3);
738          if(!reserved.isNull) {
739             job = reserved.get();
740          }
741       }
742       assertNotThrown!StalkdException(reserveJobWithTimeOut);
743       assert(!reserved.isNull);
744       assert(job.bodyAsString() == "Job data.");
745       assertNotThrown!StalkdException(releaseJob);
746 
747       // Test: Deleting a job.
748       void deleteJob() {
749          tube.deleteJob(job.id);
750       }
751       assertNotThrown!StalkdException(reserveJob);
752       assertNotThrown!StalkdException(deleteJob);
753       assert(tube.peek() is null);
754 
755       // Test: Burying a job.
756       void buryJob() {
757          tube.buryJob(job.id);
758       }
759       void peekBuried() {
760          job = tube.peekBuried();
761       }
762       job = new Job("A different set of job data.");
763       assertNotThrown!StalkdException(putJob);
764       assertNotThrown!StalkdException(reserveJob);
765       assertNotThrown!StalkdException(buryJob);
766       assertNotThrown!StalkdException(peekBuried);
767       assert(job !is null);
768       assert(job.bodyAsString() == "A different set of job data.");
769 
770       // Test: Kicking a job.
771       auto kicked = 0;
772       void kickJob() {
773          kicked = tube.kick(100);
774       }
775       assertNotThrown!StalkdException(kickJob);
776       assert(kicked == 1);
777       assert(tube.peek() !is null);
778       assertNotThrown!StalkdException(deleteJob);
779 
780       // Test: Touching a job.
781       void touchJob() {
782          tube.touchJob(job.id);
783       }
784       assertNotThrown!StalkdException(putJob);
785       assertNotThrown!StalkdException(reserveJob);
786       assertNotThrown!StalkdException(touchJob);
787       assertNotThrown!StalkdException(releaseJob);
788       assertNotThrown!StalkdException(peekJob);
789       assert(job !is null);
790       assert(job.bodyAsString() == "A different set of job data.");
791       assertNotThrown!StalkdException(deleteJob);
792    } else {
793       writeln("The BEANSTALKD_TEST_HOST environment variable is not set, advanced tests for the Tube class skipped.");
794    }
795 }