1 // Copyright (c) 2013, Peter Wood. 2 // See license.txt for licensing details. 3 module stalkd.tube; 4 5 import std.algorithm; 6 import std.conv; 7 import std.outbuffer; 8 import std.socket; 9 import std.string; 10 import std.typecons : Nullable; 11 import stalkd.connection; 12 import stalkd.exceptions; 13 import stalkd.job; 14 import stalkd.server; 15 16 /** 17 * This class models a tube within a Beanstalkd instance. There are two concepts 18 * associated with Tubes - watching and using. When you use a tube you alter the 19 * target tube that new jobs get added to. When you watch a tube you are 20 * indicating that you are interested in the jobs that have been added to it. 21 * You can only use a single tube at any one time but you can watch multiple 22 * tubes simultaneously. 23 * 24 * Note that the Connection object associated with a Tube instance should not 25 * be shared with any other tubes. For this reason it's probably best practice 26 * to create Tube's directly using the constructor that takes a Server object 27 * as that will guarantee a new Connection for the Tube. 28 */ 29 class Tube { 30 /** 31 * The name of the default tube if an explicit tube is not used. 32 */ 33 static const DEFAULT_TUBE_NAME = "default"; 34 35 /** 36 * The maximum length permitted for a tube name. 37 */ 38 static const MAX_TUBE_NAME_LEN = 200; 39 40 /** 41 * Constructor for the Tube class that creates a Tube object using the 42 * 'default' tube on the server. 43 * 44 * Params: 45 * connection = The Connection object for the server. 46 */ 47 this(Connection connection) { 48 _connection = connection; 49 _using = DEFAULT_TUBE_NAME; 50 _watching.length = 1; 51 _watching[0] = DEFAULT_TUBE_NAME; 52 } 53 54 /** 55 * Constructor for the Tube class that creates a Tube object using the 56 * 'default' tube on the server. Use this method in preference to creating 57 * a Tube using a Connection object as this guarantees a Connection dedicated 58 * to the Tube. 59 * 60 * Params: 61 * server = The Beanstalk server to create the Tube for. 62 */ 63 this(Server server) { 64 this(new Connection(server)); 65 } 66 67 /** 68 * Getter for the connection property. 69 */ 70 @property Connection connection() { 71 return(_connection); 72 } 73 74 /** 75 * This function retrieves a string containing the name of the tube that 76 * the Tube object is currently using. 77 */ 78 @property string using() { 79 return(_using); 80 } 81 82 /** 83 * This function is simply an alias for a call to the use() function. 84 */ 85 @property void using(string name) { 86 use(name); 87 } 88 89 /** 90 * This function returns a list of the name for the tubes that the Tube 91 * object is currently watching. 92 */ 93 @property string[] watching() { 94 return(_watching.dup); 95 } 96 97 /** 98 * This function alters the server tube that a Tube object uses. 99 * 100 * Params: 101 * name = The name of the tube to be used. Note that this string has to 102 * conform to Beanstalkd tube naming rules. 103 */ 104 void use(string name) { 105 if(name is null) { 106 throw(new StalkdException("Tube name not specified.")); 107 } else if(name.length > MAX_TUBE_NAME_LEN) { 108 throw(new StalkdException("Tube name too long.")); 109 } 110 send(null, "use", name); 111 112 auto response = receive(); 113 if(!response.startsWith("USING")) { 114 response = response.chomp(); 115 throw(new StalkdException(to!string("Server responded with a " ~ response ~ " error."))); 116 } 117 _using = name; 118 } 119 120 /** 121 * This function adds to the server tubes that a Tube object watches. 122 * 123 * Params: 124 * names = An array of strings containing the names of the tubes to be 125 * watched. Invalid tube names will be completely ignored. 126 */ 127 void watch(string[] names...) { 128 foreach(name; names) { 129 if(find(_watching, name).empty) { 130 send(null, "watch", name); 131 auto response = receive(); 132 if(!response.startsWith("WATCHING")) { 133 response = response.chomp(); 134 throw(new StalkdException(to!string("Server responded with a " ~ response ~ " error."))); 135 } 136 _watching ~= name; 137 } 138 } 139 } 140 141 /** 142 * This function removes one or more names from the server tubes that a Tube 143 * object watches. 144 * 145 * Params: 146 * names = An array of strings containing the names of the tubes to be 147 * ignored. Invalid tube names will be completely ignored. 148 */ 149 void ignore(string[] names...){ 150 foreach(name; names) { 151 if(!find(_watching, name).empty) { 152 send(null, "ignore", name); 153 auto response = receive(); 154 if(!response.startsWith("WATCHING")) { 155 response = response.chomp(); 156 throw(new StalkdException(to!string("Server responded with a " ~ response ~ " error."))); 157 } 158 159 string[] remaining; 160 foreach(entry; _watching) { 161 if(entry != name) { 162 remaining ~= entry; 163 } 164 } 165 _watching = remaining; 166 } 167 } 168 } 169 170 /** 171 * This function adds a new job to the tube that is currently being used. 172 * 173 * Params: 174 * job = A reference to the job to be added. Upin successful 175 * addition of the job the objects id will be updated to 176 * reflect the id given to it by Beanstalk. 177 * delay = The delay to be assigned to the new job. Defaults to 178 * Job.DEFAULT_DELAY. 179 * priority = The priority to be allocated to the new job. Defaults to 180 * Job.DEFAULT_PRIORITY. 181 * timeToRun = The time to run to be allocated to the new job. Defaults 182 * to Job.DEFAULT_TIME_TO_RUN. 183 */ 184 void put(ref Job job, uint delay=Job.DEFAULT_DELAY, uint priority=Job.DEFAULT_PRIORITY, uint timeToRun=Job.DEFAULT_TIME_TO_RUN) { 185 uint jobId; 186 auto data = job.data; 187 188 send(data, "put", priority, delay, timeToRun, data.length); 189 190 auto response = receive(); 191 auto offset = std..string.indexOf(response, " "); 192 if(offset != -1) { 193 jobId = to!uint(response[++offset..$]); 194 response = response[0..offset].stripRight(); 195 } 196 197 if(response != "INSERTED") { 198 StalkdException exception; 199 200 if(response == "BURIED") { 201 throw(new StalkdException(to!string("Server had insufficient memory to grow the priority queue. Job id " ~ to!string(jobId) ~ " was buried."))); 202 } else if(response == "JOB_TOO_BIG") { 203 throw(new StalkdException("Job is too big.")); 204 } else if(response == "DRAINING") { 205 throw(new StalkdException("Server is not accepting new jobs at this time.")); 206 } else if(response == "EXPECTED_CRLF") { 207 throw(new StalkdException("Internal message structure error.")); 208 } else { 209 throw(new StalkdException(to!string("1. Server returned a " ~ response ~ " error."))); 210 } 211 } else { 212 job.id = jobId; 213 job.tube = this; 214 } 215 } 216 217 /** 218 * A blocking implementation of the reserve() method that will not return 219 * until such time as a Job is available or an exception occurs. 220 */ 221 Job reserve() { 222 return(reserve(0).get()); 223 } 224 225 /** 226 * This function attempts to reserve a job from one of the tubes that a Tube 227 * object is currently watching. Note that the return type for the function 228 * is a Nullable!Job. This value will test as null if a Job did not become 229 * available before the time out. 230 * 231 * Params: 232 * timeOut = The maximum number of seconds for the server to wait for a 233 * job to become available. If no job is available then the 234 * function will return null. If set to zero the function will 235 * block indefinitely (i.e. it won't time out). 236 */ 237 Nullable!Job reserve(uint timeOut) { 238 Nullable!Job output; 239 char[] response = new char[100]; 240 241 if(timeOut > 0) { 242 send(null, "reserve-with-timeout", timeOut); 243 } else { 244 send(null, "reserve"); 245 } 246 247 auto total = _connection.socket.receive(response); 248 if(total == Socket.ERROR) { 249 throw(new StalkdException("Error reading from server connection.")); 250 } else if(total == 0) { 251 throw(new StalkdException("Connection to server unexpectedly terminated.")); 252 } 253 response = response[0..total]; 254 255 if(response.startsWith("RESERVED")) { 256 uint jobId; 257 ulong read, 258 size; 259 size_t[] offsets = [0, 0, 0]; 260 OutBuffer buffer; 261 262 offsets[0] = std..string.indexOf(response, " "); 263 offsets[1] = std..string.indexOf(response, " ", (offsets[0] + 1)); 264 offsets[2] = std..string.indexOf(response, "\r\n", (offsets[1] + 1)); 265 if(!offsets.find(-1).empty) { 266 throw(new StalkdException("Unrecognised response received from server.")); 267 } 268 269 jobId = to!uint(response[(offsets[0] + 1)..offsets[1]]); 270 size = to!uint(response[(offsets[1] + 1)..offsets[2]]); 271 read = response.length - (offsets[2] + 2); 272 buffer = new OutBuffer; 273 buffer.reserve(cast(uint)size); 274 275 if(read > 0) { 276 auto endPoint = response.length, 277 available = endPoint - (offsets[2] + 2); 278 279 while(available > size) { 280 endPoint--; 281 available = endPoint - (offsets[2] + 2); 282 } 283 284 buffer.write(response[(offsets[2] + 2)..endPoint]); 285 } 286 if(size > read) { 287 readInJobData(buffer, cast(uint)(size - read)); 288 } 289 290 auto job = new Job; 291 job.id = jobId; 292 job.tube = this; 293 job.write(buffer.toBytes()); 294 output = job; 295 } else if(!response.startsWith("TIMED_OUT")) { 296 response.chomp(); 297 throw(new StalkdException(to!string("2. Server returned a " ~ response ~ " error."))); 298 } 299 300 return(output); 301 } 302 303 /** 304 * This function attempts to kick buried jobs. If there are buried jobs then 305 * Beanstalk will return them to a ready state. Failing that, if there are 306 * any delayed jobs they will be kicked instead. 307 * 308 * Params: 309 * maximum = The maximum number of jobs to kick. Defaults to 1. 310 */ 311 public uint kick(uint maximum=1) { 312 uint total; 313 314 send(null, "kick", maximum); 315 auto response = receive(); 316 if(response.startsWith("KICKED")) { 317 total = to!uint(response[(std..string.indexOf(response, " ") + 1)..$]); 318 } else { 319 throw(new StalkdException(to!string("Server responded with a " ~ response ~ " error."))); 320 } 321 return(total); 322 } 323 324 /** 325 * This function kicks a specific job if it is sitting in the buried or 326 * delayed queues. 327 */ 328 public void kickJob(uint jobId) { 329 send(null, "kick-job", jobId); 330 auto response = receive(); 331 if(response != "KICKED") { 332 throw(new StalkdException(to!string("Server responded with a " ~ response ~ " error."))); 333 } 334 } 335 336 /** 337 * This function 'peeks' at the Beanstalk ready queue to see if there is a 338 * job available. If there is a job is is returned. Note that peeking does 339 * not reserve the job returned. 340 * 341 * Returns: A Job if one is available, null otherwise. 342 */ 343 public Job peek() { 344 return(peekFor("ready")); 345 } 346 347 /** 348 * This function 'peeks' at the Beanstalk delayed queue to see if there is a 349 * job available. If there is a job is is returned. Note that peeking does 350 * not reserve the job returned. 351 * 352 * Returns: A Job if one is available, null otherwise. 353 */ 354 public Job peekDelayed() { 355 return(peekFor("delayed")); 356 } 357 358 /** 359 * This function 'peeks' at the Beanstalk buried queue to see if there is a 360 * job available. If there is a job is is returned. Note that peeking does 361 * not reserve the job returned. 362 * 363 * Returns: A Job if one is available, null otherwise. 364 */ 365 public Job peekBuried() { 366 return(peekFor("buried")); 367 } 368 369 /** 370 * This function peeks at Beanstalks contents to see if a job with a given 371 * id exists. If it does it is returned. Note that peeking does not reserve 372 * the job returned. 373 * 374 * Returns: A Job if the job exists, null otherwise. 375 * 376 * Params: 377 * jobId = The unique identifier of the job to peek for. 378 */ 379 public Job peekForId(uint jobId) { 380 return(doPeek(to!string("peek " ~ to!string(jobId)))); 381 } 382 383 /** 384 * This function deletes a specific job from Beanstalk. Note that you must 385 * have reserved the job before you can delete it. 386 * 387 * Params: 388 * jobId = The unique identifier of the job to delete. 389 */ 390 public void deleteJob(uint jobId) { 391 send(null, "delete", jobId); 392 auto response = receive(); 393 if(response != "DELETED") { 394 throw(new StalkdException(to!string("3. Server returned a " ~ response ~ " error."))); 395 } 396 } 397 398 /** 399 * This function releases a previously reserved job back to Beanstalk control. 400 * 401 * Params: 402 * jobId = The unique identifier of the job to released. 403 * delay = The delay to be applied to the job when it is released 404 * back to Beanstalk. Defaults to Job.DEFAULT_DELAY. 405 * priority = The priority to be applied to the job when it is released 406 * back to Beanstalk. Defaults to Job.DEFAULT_PRIORITY. 407 */ 408 public void releaseJob(uint jobId, uint delay=Job.DEFAULT_DELAY, uint priority=Job.DEFAULT_PRIORITY) { 409 send(null, "release", jobId, priority, delay); 410 auto response = receive(); 411 if(response == "BURIED") { 412 throw(new StalkdException(to!string("Server had insufficient memory to grow its priority queue. Job id " ~ to!string(jobId) ~ " was buried."))); 413 } else if(response != "RELEASED") { 414 throw(new StalkdException(to!string("4. Server returned a " ~ response ~ " error."))); 415 } 416 } 417 418 /** 419 * This function buries a specified job. Note that you must have first 420 * reserved the job before you can bury it. 421 * 422 * Params: 423 * jobId = The unique identifier of the job to bury. 424 * priority = The priority to assign to the job as part of burying it. 425 * Defaults to Job.DEFAULT_PRIORITY. 426 */ 427 public void buryJob(uint jobId, uint priority=Job.DEFAULT_PRIORITY) { 428 send(null, "bury", jobId, priority); 429 auto response = receive(); 430 if(response != "BURIED") { 431 throw(new StalkdException(to!string("5. Server returned a " ~ response ~ " error."))); 432 } 433 } 434 435 /** 436 * This function touches a job, extending its time to run on the server. Note 437 * that you must have first reserved the job before you can touch it. 438 * 439 * Params: 440 * jobId = The unique identifier of the job to touch. 441 */ 442 public void touchJob(uint jobId) { 443 send(null, "touch", jobId); 444 auto response = receive(); 445 if(response != "TOUCHED") { 446 throw(new StalkdException(to!string("6. Server returned a " ~ response ~ " error."))); 447 } 448 } 449 450 /** 451 * This function is used internally by the class to dispatch requests to 452 * the Beanstalk server. 453 * 454 * Params: 455 * data = The data to be sent in the request. If null then no 456 * data is sent. 457 * parameters = The parameters to be prefixed to the data being sent. 458 */ 459 private void send(T...)(in ubyte[] data, T parameters) { 460 OutBuffer buffer = new OutBuffer; 461 string request; 462 uint index; 463 464 foreach(parameter; parameters) { 465 if(index > 0) { 466 request ~= " "; 467 } 468 request ~= to!string(parameter); 469 index++; 470 } 471 request ~= "\r\n"; 472 buffer.reserve(request.length + (data ? data.length : 0) + 2); 473 buffer.write(request); 474 475 if(data !is null && data.length > 0) { 476 buffer.write(data); 477 buffer.write("\r\n"); 478 } 479 480 if(_connection.socket.send(buffer.toBytes()) == Socket.ERROR) { 481 throw(new StalkdException("Error sending data on server connection.")); 482 } 483 } 484 485 /** 486 * This function is used internally by the class wherever a simple answer is 487 * expected to a request. 488 * 489 * Returns: A string containing the response value read. Note that trailing 490 * whitespace on the response will have been removed. 491 */ 492 private string receive() { 493 char[] response = new char[100]; 494 auto total = _connection.socket.receive(response); 495 496 if(total == Socket.ERROR) { 497 throw(new StalkdException("Error reading from server connection.")); 498 } else if(total == 0) { 499 throw(new StalkdException("Connection to server unexpectedly terminated.")); 500 } 501 502 return(to!string(response[0..total]).chomp()); 503 } 504 505 /** 506 * This function us used internally by the class to read job data into an 507 * OutBuffer instance. 508 * 509 * Params: 510 * buffer = The buffer to place the bytes read into. 511 * quantity = The number of bytes of data to be read in. 512 */ 513 private void readInJobData(ref OutBuffer buffer, uint quantity) { 514 ubyte[] data = new ubyte[quantity + 2]; 515 auto total = _connection.socket.receive(data); 516 517 if(total == Socket.ERROR) { 518 throw(new StalkdException("Error retrieving response from server.")); 519 } else if(total == 0) { 520 throw(new StalkdException("Server connection closed unexpectedly.")); 521 } 522 data = data[0..($ - 2)]; 523 buffer.write(data); 524 } 525 526 /** 527 * This function is used internally by the class to check for available jobs 528 * of a specified type. 529 * 530 * Params: 531 * type = A string that should be either "ready", "delayed" or "buried". 532 */ 533 private Job peekFor(string type) { 534 return(doPeek(to!string("peek-" ~ type))); 535 } 536 537 /** 538 * This function performs a peek operation against the server. 539 * 540 * Params: 541 * request = The request to be sent to the server. 542 */ 543 private Job doPeek(string request) { 544 Job job = null; 545 char[] response = new char[100]; 546 547 send(null, request); 548 549 auto total = _connection.socket.receive(response); 550 if(total == Socket.ERROR) { 551 throw(new StalkdException("Error reading from server connection.")); 552 } else if(total == 0) { 553 throw(new StalkdException("Connection to server unexpectedly terminated.")); 554 } 555 response = response[0..total].chomp(); 556 557 if(response.startsWith("FOUND")) { 558 uint jobId; 559 ulong size, 560 read; 561 size_t[] offsets = [0, 0, 0]; 562 OutBuffer buffer; 563 564 offsets[0] = std..string.indexOf(response, " "); 565 offsets[1] = std..string.indexOf(response, " ", (offsets[0] + 1)); 566 offsets[2] = std..string.indexOf(response, "\r\n", (offsets[1] + 1)); 567 if(!offsets.find(-1).empty) { 568 throw(new StalkdException("Unrecognised response received from server.")); 569 } 570 571 jobId = to!uint(response[(offsets[0] + 1)..offsets[1]]); 572 size = to!size_t(response[(offsets[1] + 1)..offsets[2]]); 573 read = response.length - (offsets[2] + 2); 574 buffer = new OutBuffer; 575 buffer.reserve(cast(uint)size); 576 577 if(read > 0) { 578 auto endPoint = response.length, 579 available = endPoint - (offsets[2] + 2); 580 581 while(available > size) { 582 endPoint--; 583 available = endPoint - (offsets[2] + 2); 584 } 585 586 buffer.write(response[(offsets[2] + 2)..$]); 587 } 588 if(size > read) { 589 readInJobData(buffer, cast(uint)(size - read)); 590 } 591 592 job = new Job; 593 job.id = jobId; 594 job.tube = this; 595 job.write(buffer.toBytes()); 596 } else if(!response.startsWith("NOT_FOUND")) { 597 throw(new StalkdException(to!string("7. Server returned a " ~ response ~ " error."))); 598 } 599 600 return(job); 601 } 602 603 private Connection _connection; 604 private string _using; 605 private string[] _watching; 606 } 607 608 //------------------------------------------------------------------------------ 609 // Unit Tests 610 //------------------------------------------------------------------------------ 611 /* 612 * NOTE: There is a limit to the amount of unit testing that can be performed 613 * without an actual server connection. For this reason, the test below 614 * check for the presence of an available test Beanstalkd instance via 615 * the existence of the BEANSTALKD_TEST_HOST environment variable. If 616 * this is set then an attempt will be made to connect to it to perform 617 * an additional series of tests. You can specify the port for this test 618 * server using the BEANSTALKD_TEST_PORT environment variable. As the 619 * queues on this server will be added to, deleted from and cleared of 620 * content as part of the tests this server should not be used for any 621 * other purpose! 622 */ 623 unittest { 624 import core.thread; 625 import core.time; 626 import std.stdio; 627 import std.conv; 628 import std.process; 629 import std.exception; 630 import stalkd; 631 632 auto connection = new Connection("127.0.0.1"); 633 auto tube = new Tube(connection); 634 635 assert(tube.connection is connection); 636 assert(tube.using is Tube.DEFAULT_TUBE_NAME); 637 assert(tube.watching == [Tube.DEFAULT_TUBE_NAME]); 638 639 auto host = environment.get("BEANSTALKD_TEST_HOST"); 640 if(host !is null) { 641 writeln("The BEANSTALKD_TEST_HOST environment variable is set, conducting advanced tests for the Tube class."); 642 ushort port = Server.DEFAULT_BEANSTALKD_PORT; 643 if(environment.get("BEANSTALKD_TEST_PORT") !is null) { 644 port = to!ushort(environment.get("BEANSTALKD_TEST_PORT")); 645 } 646 connection = new Connection(host, port); 647 648 string tubeName = "alternative"; 649 tube = new Tube(connection); 650 651 void useTube() { 652 tube.use(tubeName); 653 } 654 655 void watchTube() { 656 tube.watch(tubeName); 657 } 658 659 void ignoreTube() { 660 tube.ignore(tubeName); 661 } 662 663 // Test: Use a tube name. 664 assertNotThrown!StalkdException(useTube); 665 assert(tube.using is tubeName); 666 assert(tube.watching == [Tube.DEFAULT_TUBE_NAME]); 667 668 // Test: Use can switch between tube names multiple times. 669 tubeName = Tube.DEFAULT_TUBE_NAME; 670 assertNotThrown!StalkdException(useTube); 671 assert(tube.using is tubeName); 672 assert(tube.watching == [Tube.DEFAULT_TUBE_NAME]); 673 674 // Test: Watch a tube name. 675 tubeName = "alternative"; 676 assertNotThrown!StalkdException(watchTube); 677 assert(tube.using is Tube.DEFAULT_TUBE_NAME); 678 assert(tube.watching == [Tube.DEFAULT_TUBE_NAME, tubeName]); 679 680 // Test: Ignore a tube name. 681 assertNotThrown!StalkdException(ignoreTube); 682 assert(tube.using is Tube.DEFAULT_TUBE_NAME); 683 assert(tube.watching == [Tube.DEFAULT_TUBE_NAME]); 684 685 // Test: The default tube name can be ignored. 686 assertNotThrown!StalkdException(watchTube); 687 tubeName = Tube.DEFAULT_TUBE_NAME; 688 assertNotThrown!StalkdException(ignoreTube); 689 assert(tube.using is Tube.DEFAULT_TUBE_NAME); 690 assert(tube.watching == ["alternative"]); 691 692 // Test: You can't unwatch all tubes. 693 tubeName = "alternative"; 694 assertThrown!StalkdException(ignoreTube); 695 696 // Clear any existing content from the tube before starting. 697 Job job; 698 while((job = tube.peek()) !is null) { 699 tube.deleteJob(job.id); 700 } 701 while((job = tube.peekBuried()) !is null) { 702 tube.deleteJob(job.id); 703 } 704 705 // Put a job into a tube. 706 job = new Job("Job data."); 707 tube = new Tube(Server(host, port)); 708 void putJob() { 709 tube.put(job); 710 } 711 assertNotThrown!StalkdException(putJob); 712 713 // Test: Peek to see if the job is there. 714 void peekJob() { 715 job = tube.peek(); 716 } 717 assertNotThrown!StalkdException(peekJob); 718 assert(job !is null); 719 assert(job.bodyAsString() == "Job data."); 720 721 // Test: Reserve a job without timeout. 722 void reserveJob() { 723 job = tube.reserve(); 724 } 725 assertNotThrown!StalkdException(reserveJob); 726 assert(job.bodyAsString() == "Job data."); 727 728 // Test: Releasing a job. 729 void releaseJob() { 730 tube.releaseJob(job.id); 731 } 732 assertNotThrown!StalkdException(releaseJob); 733 734 // Test: Reserve a job from a tube with timeout. 735 Nullable!Job reserved; 736 void reserveJobWithTimeOut() { 737 reserved = tube.reserve(3); 738 if(!reserved.isNull) { 739 job = reserved.get(); 740 } 741 } 742 assertNotThrown!StalkdException(reserveJobWithTimeOut); 743 assert(!reserved.isNull); 744 assert(job.bodyAsString() == "Job data."); 745 assertNotThrown!StalkdException(releaseJob); 746 747 // Test: Deleting a job. 748 void deleteJob() { 749 tube.deleteJob(job.id); 750 } 751 assertNotThrown!StalkdException(reserveJob); 752 assertNotThrown!StalkdException(deleteJob); 753 assert(tube.peek() is null); 754 755 // Test: Burying a job. 756 void buryJob() { 757 tube.buryJob(job.id); 758 } 759 void peekBuried() { 760 job = tube.peekBuried(); 761 } 762 job = new Job("A different set of job data."); 763 assertNotThrown!StalkdException(putJob); 764 assertNotThrown!StalkdException(reserveJob); 765 assertNotThrown!StalkdException(buryJob); 766 assertNotThrown!StalkdException(peekBuried); 767 assert(job !is null); 768 assert(job.bodyAsString() == "A different set of job data."); 769 770 // Test: Kicking a job. 771 auto kicked = 0; 772 void kickJob() { 773 kicked = tube.kick(100); 774 } 775 assertNotThrown!StalkdException(kickJob); 776 assert(kicked == 1); 777 assert(tube.peek() !is null); 778 assertNotThrown!StalkdException(deleteJob); 779 780 // Test: Touching a job. 781 void touchJob() { 782 tube.touchJob(job.id); 783 } 784 assertNotThrown!StalkdException(putJob); 785 assertNotThrown!StalkdException(reserveJob); 786 assertNotThrown!StalkdException(touchJob); 787 assertNotThrown!StalkdException(releaseJob); 788 assertNotThrown!StalkdException(peekJob); 789 assert(job !is null); 790 assert(job.bodyAsString() == "A different set of job data."); 791 assertNotThrown!StalkdException(deleteJob); 792 } else { 793 writeln("The BEANSTALKD_TEST_HOST environment variable is not set, advanced tests for the Tube class skipped."); 794 } 795 }