首页 > 代码库 > Redis 分布式锁
Redis 分布式锁
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <string.h> 4 // -------------------- 5 #include <unistd.h> 6 #include <pthread.h> 7 // -------------------- 8 #include <hiredis.h> 9 // -------------------- 10 #include "thread_helper.h" 11 12 #define ATOMIC_UNLOCK 0 13 #define ATOMIC_LOCK 1 14 15 int lock = 0; 16 int num = 0; 17 int total_num = 50; 18 ThreadSchema* ts; 19 20 bool acquire_lock(int sn, redisContext* c) { 21 redisReply* r = NULL; 22 int res = 0; 23 24 if(NULL == c) 25 return false; 26 27 while(true) { 28 r = (redisReply*)redisCommand(c,"SETNX tom_lock 1"); 29 res = r->integer; 30 freeReplyObject(r); 31 32 if (res == 1) { 33 r = (redisReply*)redisCommand(c,"EXPIRE tom_lock 60"); 34 freeReplyObject(r); 35 return true; 36 } else { 37 38 r = (redisReply*)redisCommand(c,"TTL tom_lock"); 39 res = r->integer; 40 //printf("acquire_lock %d %d\n", r->type, r->integer); 41 freeReplyObject(r); 42 43 if(res == -1){ 44 r = (redisReply*)redisCommand(c,"EXPIRE tom_lock 1"); 45 freeReplyObject(r); 46 } 47 } 48 49 usleep(1000); 50 } 51 52 return false; 53 } 54 55 bool release_lock(int sn, redisContext* c) { 56 redisReply* r = NULL; 57 58 while(true) { 59 r = (redisReply*)redisCommand(c,"WATCH tom_lock"); 60 freeReplyObject(r); 61 62 r = (redisReply*)redisCommand(c,"GET tom_lock"); 63 64 //printf("release_lock %lld\n", r->integer); 65 66 if (((REDIS_REPLY_INTEGER == r->type) && (1 == r->integer)) || 67 ((REDIS_REPLY_STRING == r->type) && (strcmp("1", r->str) == 0)) 68 ) { 69 70 //printf("release_lock state 1\n"); 71 72 freeReplyObject(r); 73 74 r = (redisReply*)redisCommand(c,"MULTI"); 75 freeReplyObject(r); 76 77 r = (redisReply*)redisCommand(c,"DEL tom_lock"); 78 freeReplyObject(r); 79 80 r = (redisReply*)redisCommand(c,"EXEC"); 81 82 return true; 83 } else if(REDIS_REPLY_NIL == r->type) { 84 printf("release_lock state 2\n"); 85 86 freeReplyObject(r); 87 88 r = (redisReply*)redisCommand(c,"UNWATCH"); 89 freeReplyObject(r); 90 91 return true; 92 } else { 93 printf("release_lock sn:%d, tate 3\n", sn); 94 95 freeReplyObject(r); 96 97 r = (redisReply*)redisCommand(c,"UNWATCH"); 98 freeReplyObject(r); 99 }100 101 usleep(1000);102 }103 104 return false;105 }106 107 void* test_thread(void* arg) {108 int sn = *((int*)arg);109 redisContext* c = ts->rctx[sn];110 redisReply* r = NULL;111 int tom = 0;112 113 while(true) {114 115 //printf("En Lock sn:%d\n", sn);116 fflush(stdout);117 118 if(!acquire_lock(sn, c)) {119 printf("Co Lock\n");120 fflush(stdout);121 sched_yield();122 continue;123 }124 125 // printf("Lv Lock sn:%d\n", sn);126 // fflush(stdout);127 128 r = (redisReply*)redisCommand(c,"GET tom");129 tom = atoi(r->str);130 freeReplyObject(r);131 132 if(tom >= 50) {133 release_lock(sn, c);134 break;135 }136 137 tom += 1;138 printf("sn: %d, tom: %d\n", sn, tom);139 fflush(stdout);140 141 r = (redisReply*)redisCommand(c,"SET tom %d", tom);142 freeReplyObject(r);143 144 // printf("En UnLock sn:%d\n", sn);145 // fflush(stdout);146 147 release_lock(sn, c);148 149 //usleep(1);150 // printf("Lv UnLock sn:%d\n", sn);151 // fflush(stdout);152 }153 154 printf("Lv Test:%d\n", sn);155 fflush(stdout);156 157 return NULL;158 }159 160 int main(int argc, char* argv[])161 {162 ts = new ThreadSchema(20);163 164 const char *hostname = "127.0.0.1";165 int port = 6379;166 struct timeval timeout = { 1, 500000 }; // 1.5 seconds167 168 for(int i=0; i<ts->num; ++i) {169 ts->rctx[i] = redisConnectWithTimeout(hostname, port, timeout);170 if (ts->rctx[i] == NULL || ts->rctx[i]->err) {171 if (ts->rctx[i]) {172 printf("Connection error: %s\n", ts->rctx[i]->errstr);173 } else {174 printf("Connection error: can‘t allocate redis context\n");175 }176 exit(1);177 }178 }179 180 redisReply* r = (redisReply*)redisCommand(ts->rctx[0],"DEL tom_lock");181 freeReplyObject(r);182 183 r = (redisReply*)redisCommand(ts->rctx[0],"SET tom 0");184 freeReplyObject(r);185 186 for(int i=0; i<ts->num; ++i)187 {188 pthread_create(&ts->pid[i], NULL, test_thread, &ts->sn[i]);189 }190 191 /* Disconnects and frees the context */192 193 while(num < total_num)194 {195 sched_yield();196 }197 198 return 0;199 }
#ifndef THREAD_HELPER_H#define THREAD_HELPER_H#include <hiredis.h>#include <pthread.h>class ThreadSchema{public: ThreadSchema(int n); ~ThreadSchema(); int num; int* sn; redisContext** rctx; pthread_t* pid;};#endif
1 #include "thread_helper.h" 2 #include <memory.h> 3 #include <stdlib.h> 4 5 ThreadSchema::ThreadSchema(int n) 6 { 7 num = n; 8 9 sn = (int*)malloc(sizeof(int) * num);10 if (NULL != sn) {11 for (int i=0; i < num; ++i) {12 sn[i] = i;13 }14 }15 16 rctx = (redisContext**)malloc(sizeof(redisContext*) * num);17 if (NULL != rctx) {18 memset(rctx, 0, sizeof(redisContext*) * num);19 }20 21 pid = (pthread_t*)malloc(sizeof(pthread_t) * num);22 if(NULL != pid) {23 memset(pid, 0, sizeof(pthread_t*) * num);24 }25 }26 27 ThreadSchema::~ThreadSchema() {28 29 for (int i=0; i < num; ++i) {30 if(NULL != rctx[i]) {31 redisFree(rctx[i]);32 }33 }34 35 if(NULL != rctx) {36 free(rctx);37 rctx = NULL;38 }39 40 if(NULL != pid) {41 free(pid);42 pid = NULL;43 }44 }
Redis 分布式锁
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。