1 // Copyright (c) 2013, Peter Wood. 2 // See license.txt for licensing details. 3 module stalkd.job; 4 5 import std.conv; 6 import std.outbuffer; 7 import stalkd.connection; 8 import stalkd.exceptions; 9 import stalkd.tube; 10 11 /** 12 * This class models a Beanstalkd job. At it's lowest level a Job is really just 13 * a collection of bytes but this function provides some addition functionality 14 * for interacting with jobs. 15 */ 16 class Job { 17 /** 18 * The default priority given to jobs. 19 */ 20 static const DEFAULT_PRIORITY = 0; 21 22 /** 23 * The default delay applied to jobs. 24 */ 25 static const DEFAULT_DELAY = 0; 26 27 /** 28 * The default time to run applied to jobs. 29 */ 30 static const DEFAULT_TIME_TO_RUN = 180; 31 32 /** 33 * Default constructor for the Job class. 34 */ 35 this() { 36 _buffer = new OutBuffer; 37 } 38 39 /** 40 * Constructor for the Job class that allows it to handle strings of any 41 * type. 42 * 43 * Params: 44 * data = A string (or wstring or dstring) containing the job data. 45 */ 46 this(T)(T[] data) { 47 _buffer = new OutBuffer; 48 append(data); 49 } 50 51 /** 52 * Getter for the job id property. Note that this property will only be valid 53 * for jobs that have been added to Beanstalk or extract from Beanstalk. If 54 * called on a Job that doesn't meet these criteria an exception will be 55 * thrown. 56 */ 57 const @property uint id() { 58 if(_tube is null) { 59 throw(new StalkdException("Job doesn't yet possess a Beanstald id.")); 60 } 61 62 return(_id); 63 } 64 65 /** 66 * Setter for the job id property (visible only in package). 67 */ 68 @property package void id(uint id) { 69 _id = id; 70 } 71 72 /** 73 * This function returns the Tube that a job was either put on or reserved 74 * from. If the Job hasn't been put, reserved or peeked then this will be 75 * null. 76 */ 77 @property Tube tube() { 78 return(_tube); 79 } 80 81 /** 82 * Tube property setter, only accessible within the package. 83 */ 84 @property package void tube(Tube tube) { 85 _tube = tube; 86 } 87 88 /** 89 * This function provides package level access to the data associated with 90 * a Job object. 91 * 92 * Returns: An array of ubytes containing the Job data. 93 */ 94 @property ubyte[] data() { 95 return(_buffer.toBytes()); 96 } 97 98 /** 99 * This function appends an array of ubytes to the data stored within a Job 100 * object. 101 * 102 * Params: 103 * data = The array of data to be written to the Job. 104 */ 105 package void write(ubyte[] data) { 106 if(data.length > 0) { 107 _buffer.reserve(data.length); 108 _buffer.write(data); 109 } 110 } 111 112 /** 113 * This function appends a string (or wstring or dstring) to the data stored 114 * within a Job. 115 * 116 * Params 117 * data = The string to be written into the Job. 118 */ 119 void append(T)(T[] data) { 120 write(cast(ubyte[])data); 121 } 122 123 /** 124 * Fetches the body of the job, converting it to a string in the process. 125 */ 126 string bodyAsString() { 127 return(to!string(cast(char[])this.data)); 128 } 129 130 /** 131 * Fetches the body of the job, converting it to a dstring in the process. 132 */ 133 dstring bodyAsDString() { 134 return(to!dstring(cast(dchar[])this.data)); 135 } 136 137 /** 138 * Fetches the body of the job, converting it to a wstring in the process. 139 */ 140 wstring bodyAsWString() { 141 return(to!wstring(cast(wchar[])this.data)); 142 } 143 144 /** 145 * This function deletes a job from Beanstalk. Note that this function can 146 * only be called on Jobs that have a been put into or reserved out of a 147 * Beanstalk server. 148 */ 149 void destroy() { 150 if(_tube is null) { 151 throw(new StalkdException("Job is not associated with a tube and cannot be deleted.")); 152 } 153 _tube.deleteJob(_id); 154 } 155 156 /** 157 * This function releases a job back to Beanstalk. Note that only reserved 158 * jobs can be released. 159 * 160 * Params: 161 * delay = The delay to be applied to the job as part of it's release. 162 * This defaults to DEFAULT_DELAY. 163 * priority = The priority to be given to the job as it is released. This 164 * defaults to DEFAULT_PRIORITY. 165 */ 166 void release(uint delay=DEFAULT_DELAY, uint priority=DEFAULT_PRIORITY) { 167 if(_tube is null) { 168 throw(new StalkdException("Job is not associated with a tube and cannot be released.")); 169 } 170 _tube.releaseJob(_id, delay, priority); 171 } 172 173 /** 174 * This function buries a job on Beanstalk. Note that only reserved jobs can 175 * be buried. 176 * 177 * Params: 178 * priority = The priority to be assigned to the buried job. Defaults to 179 * DEFAULT_PRIORITY. 180 */ 181 void bury(uint priority= DEFAULT_PRIORITY) { 182 if(_tube is null) { 183 throw(new StalkdException("Job is not associated with a tube and cannot be buried.")); 184 } 185 _tube.buryJob(_id, priority); 186 } 187 188 /** 189 * This function touchs a job on Beanstalk, extending its time to run. Note 190 * that only reserved jobs can be touched. 191 */ 192 void touch() { 193 if(_tube is null) { 194 throw(new StalkdException("Job is not associated with a tube and cannot be touched.")); 195 } 196 _tube.touchJob(_id); 197 } 198 199 private uint _id; 200 private OutBuffer _buffer; 201 private Tube _tube; 202 } 203 204 //------------------------------------------------------------------------------ 205 // Unit Tests 206 //------------------------------------------------------------------------------ 207 /* 208 * NOTE: There is a limit to the amount of unit testing that can be performed 209 * without an actual server connection. For this reason, the test below 210 * check for the presence of an available test Beanstalkd instance via 211 * the existence of the BEANSTALKD_TEST_HOST environment variable. If 212 * this is set then an attempt will be made to connect to it to perform 213 * an additional series of tests. You can specify the port for this test 214 * server using the BEANSTALKD_TEST_PORT environment variable. As the 215 * queues on this server will be added to, deleted from and cleared of 216 * content as part of the tests this server should not be used for any 217 * other purpose! 218 */ 219 unittest { 220 import std.exception; 221 import std.process; 222 import std.stdio; 223 import std.string; 224 import stalkd; 225 226 auto job = new Job("Test data."); 227 228 void getId() { 229 job.id; 230 } 231 void callBury() { 232 job.bury(); 233 } 234 void callDestroy() { 235 job.destroy(); 236 } 237 void callRelease() { 238 job.release(); 239 } 240 void callTouch() { 241 job.touch(); 242 } 243 244 // Test: Basic stuff. 245 assertThrown!StalkdException(getId); 246 assert(job.tube is null); 247 assert(job.data == "Test data.".representation); 248 assert(job.bodyAsString() == "Test data."); 249 250 job.append(" Extra content."); 251 assert(job.data == "Test data. Extra content.".representation); 252 assert(job.bodyAsString() == "Test data. Extra content."); 253 254 // Test: Operations when a tube hasn't been set. 255 assertThrown!StalkdException(callBury); 256 assertThrown!StalkdException(callDestroy); 257 assertThrown!StalkdException(callRelease); 258 assertThrown!StalkdException(callTouch); 259 260 auto host = environment.get("BEANSTALKD_TEST_HOST"); 261 if(host !is null) { 262 writeln("The BEANSTALKD_TEST_HOST environment variable is set, conducting advanced tests for the Job class."); 263 ushort port = Server.DEFAULT_BEANSTALKD_PORT; 264 if(environment.get("BEANSTALKD_TEST_PORT") !is null) { 265 port = to!ushort(environment.get("BEANSTALKD_TEST_PORT")); 266 } 267 auto connection = new Connection(host, port); 268 auto tube = new Tube(connection); 269 270 // Clear any existing content from the tube before starting. 271 while((job = tube.peek()) !is null) { 272 tube.deleteJob(job.id); 273 } 274 while((job = tube.peekBuried()) !is null) { 275 tube.deleteJob(job.id); 276 } 277 278 // Test: Job id is accessible. 279 job = new Job("Example test job."); 280 tube.put(job); 281 assertNotThrown!StalkdException(getId); 282 283 // Test: Releasing a job. 284 job = tube.reserve(3).get(); 285 assert(job !is null); 286 assertNotThrown!StalkdException(callRelease); 287 assert(tube.peek() !is null); 288 289 // Test: Burying a job. 290 job = tube.reserve(3).get(); 291 assert(job !is null); 292 assertNotThrown!StalkdException(callBury); 293 assert(tube.peek() is null); 294 assert(tube.peekBuried() !is null); 295 296 // Test: Destroying a job. 297 assert(tube.kick() == 1); 298 assert(tube.peek() !is null); 299 job = tube.reserve(); 300 assert(job !is null); 301 assertNotThrown!StalkdException(callDestroy); 302 assert(tube.peek() is null); 303 } else { 304 writeln("The BEANSTALKD_TEST_HOST environment variable is not set, advanced tests for the Job class skipped."); 305 } 306 }