首页 > 代码库 > Redis 分布式锁

Redis 分布式锁

  1 #include <stdio.h>  2 #include <stdlib.h>  3 #include <string.h>  4 // --------------------  5 #include <unistd.h>  6 #include <pthread.h>  7 // --------------------  8 #include <hiredis.h>  9 // -------------------- 10 #include "thread_helper.h" 11  12 #define ATOMIC_UNLOCK 0 13 #define ATOMIC_LOCK 1 14  15 int lock = 0; 16 int num = 0; 17 int total_num = 50; 18 ThreadSchema* ts; 19  20 bool acquire_lock(int sn, redisContext* c) { 21     redisReply* r = NULL; 22     int res = 0; 23  24     if(NULL == c) 25         return false; 26  27     while(true) { 28         r = (redisReply*)redisCommand(c,"SETNX tom_lock 1"); 29         res = r->integer; 30         freeReplyObject(r); 31  32         if (res == 1) { 33             r = (redisReply*)redisCommand(c,"EXPIRE tom_lock 60"); 34             freeReplyObject(r); 35             return true; 36         } else { 37  38             r = (redisReply*)redisCommand(c,"TTL tom_lock"); 39             res = r->integer; 40             //printf("acquire_lock %d %d\n", r->type, r->integer); 41             freeReplyObject(r); 42  43             if(res == -1){ 44                 r = (redisReply*)redisCommand(c,"EXPIRE tom_lock 1"); 45                 freeReplyObject(r); 46             } 47         } 48  49         usleep(1000); 50     } 51  52     return false; 53 } 54  55 bool release_lock(int sn, redisContext* c) { 56     redisReply* r = NULL; 57  58     while(true) { 59         r = (redisReply*)redisCommand(c,"WATCH tom_lock"); 60         freeReplyObject(r); 61  62         r = (redisReply*)redisCommand(c,"GET tom_lock"); 63  64         //printf("release_lock %lld\n", r->integer); 65  66         if (((REDIS_REPLY_INTEGER == r->type) && (1 == r->integer)) ||  67             ((REDIS_REPLY_STRING == r->type) && (strcmp("1", r->str) == 0)) 68             ) { 69  70             //printf("release_lock state 1\n"); 71  72             freeReplyObject(r); 73  74             r = (redisReply*)redisCommand(c,"MULTI"); 75             freeReplyObject(r); 76  77             r = (redisReply*)redisCommand(c,"DEL tom_lock"); 78             freeReplyObject(r); 79  80             r = (redisReply*)redisCommand(c,"EXEC"); 81  82             return true; 83         } else if(REDIS_REPLY_NIL == r->type) { 84             printf("release_lock state 2\n"); 85  86             freeReplyObject(r); 87  88             r = (redisReply*)redisCommand(c,"UNWATCH"); 89             freeReplyObject(r); 90  91             return true; 92         } else { 93             printf("release_lock sn:%d, tate 3\n", sn); 94  95             freeReplyObject(r); 96  97             r = (redisReply*)redisCommand(c,"UNWATCH"); 98             freeReplyObject(r); 99         }100 101         usleep(1000);102     }103 104     return false;105 }106 107 void* test_thread(void* arg) {108     int sn = *((int*)arg);109     redisContext* c = ts->rctx[sn];110     redisReply* r = NULL;111     int tom = 0;112 113     while(true) {114 115         //printf("En Lock sn:%d\n", sn);116         fflush(stdout);117 118         if(!acquire_lock(sn, c)) {119             printf("Co Lock\n");120             fflush(stdout);121             sched_yield();122             continue;123         }124 125         // printf("Lv Lock sn:%d\n", sn);126         // fflush(stdout);127 128         r = (redisReply*)redisCommand(c,"GET tom");129         tom = atoi(r->str);130         freeReplyObject(r);131 132         if(tom >= 50) {133             release_lock(sn, c);134             break;135         }136 137         tom += 1;138         printf("sn: %d, tom: %d\n", sn, tom);139         fflush(stdout);140 141         r = (redisReply*)redisCommand(c,"SET tom %d", tom);142         freeReplyObject(r);143 144         // printf("En UnLock sn:%d\n", sn);145         // fflush(stdout);146 147         release_lock(sn, c);148 149         //usleep(1);150         // printf("Lv UnLock sn:%d\n", sn);151         // fflush(stdout);152     }153 154     printf("Lv Test:%d\n", sn);155     fflush(stdout);156 157     return NULL;158 }159 160 int main(int argc, char* argv[])161 {162     ts = new ThreadSchema(20);163 164     const char *hostname = "127.0.0.1";165     int port = 6379;166     struct timeval timeout = { 1, 500000 }; // 1.5 seconds167 168     for(int i=0; i<ts->num; ++i) {169         ts->rctx[i] = redisConnectWithTimeout(hostname, port, timeout);170         if (ts->rctx[i] == NULL || ts->rctx[i]->err) {171             if (ts->rctx[i]) {172                 printf("Connection error: %s\n", ts->rctx[i]->errstr);173             } else {174                 printf("Connection error: can‘t allocate redis context\n");175             }176             exit(1);177         }178     }179 180     redisReply* r = (redisReply*)redisCommand(ts->rctx[0],"DEL tom_lock");181     freeReplyObject(r);182 183     r = (redisReply*)redisCommand(ts->rctx[0],"SET tom 0");184     freeReplyObject(r);185 186     for(int i=0; i<ts->num; ++i)187     {188         pthread_create(&ts->pid[i], NULL, test_thread, &ts->sn[i]);189     }190 191     /* Disconnects and frees the context */192 193     while(num < total_num)194     {195         sched_yield();196     }197 198     return 0;199 }

 

#ifndef THREAD_HELPER_H#define THREAD_HELPER_H#include <hiredis.h>#include <pthread.h>class ThreadSchema{public:    ThreadSchema(int n);    ~ThreadSchema();    int             num;    int*            sn;    redisContext**  rctx;    pthread_t*      pid;};#endif

 

 1 #include "thread_helper.h" 2 #include <memory.h> 3 #include <stdlib.h> 4  5 ThreadSchema::ThreadSchema(int n) 6 { 7     num = n; 8  9     sn = (int*)malloc(sizeof(int) * num);10     if (NULL != sn) {11         for (int i=0; i < num; ++i) {12             sn[i] = i;13         }14     }15 16     rctx = (redisContext**)malloc(sizeof(redisContext*) * num);17     if (NULL != rctx) {18         memset(rctx, 0, sizeof(redisContext*) * num);19     }20 21     pid = (pthread_t*)malloc(sizeof(pthread_t) * num);22     if(NULL != pid) {23         memset(pid, 0, sizeof(pthread_t*) * num);24     }25 }26 27 ThreadSchema::~ThreadSchema() {28 29     for (int i=0; i < num; ++i) {30         if(NULL != rctx[i]) {31             redisFree(rctx[i]);32         }33     }34 35     if(NULL != rctx) {36         free(rctx);37         rctx = NULL;38     }39 40     if(NULL != pid) {41         free(pid);42         pid = NULL;43     }44 }

 

Redis 分布式锁