RODY/app/utils/SnowflakeTool.py
552068321@qq.com 6f7de660aa first commit
2022-11-04 17:37:08 +08:00

83 lines
2.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
@author: cxyfreedom
@desc: 基于 snowflake 生成分布式ID
"""
import time
# 每一部分占用的位数
TIMESTAMP_BIT = 41 # 时间戳占用位数
MACHINE_BIT = 5 # 机器标识占用的位数
DATACENTER_BIT = 5 # 数据中心占用的位数
SEQUENCE_BIT = 12 # 序列号占用的位数
# 每一部分的最大值
MAX_DATACENTER_NUM = -1 ^ (-1 << DATACENTER_BIT)
MAX_MACHINE_NUM = -1 ^ (-1 << MACHINE_BIT)
MAX_SEQUENCE = -1 ^ (-1 << SEQUENCE_BIT)
# 每一部分向左的位移
MACHINE_LEFT = SEQUENCE_BIT
DATACENTER_LEFT = MACHINE_BIT + SEQUENCE_BIT
TIMESTAMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT
class SnowFlake:
class OverflowError(TypeError):
"""
分布式ID生成算法占位符溢出异常会导致生成ID为负数
"""
pass
class RuntimeError(TypeError):
"""
运行时间错误,在此项目中当前运行时间小于上一次运行时间。
"""
pass
def __init__(self):
if TIMESTAMP_BIT + SEQUENCE_BIT + MACHINE_BIT + DATACENTER_BIT != 63:
raise self.OverflowError(
"TIMESTAMP_BIT + SEQUENCE_BIT + MACHINE_BIT + DATACENTER_BIT not equal to 63bit")
self.datacenter_id = 0 # 数据中心编号
self.machineId = 0 # 机器标识编号
self.sequence = 0 # 序列号
self.last_stamp = -1 # 上一次时间戳
def nextId(self):
"""生成下一个ID"""
cur_stamp = self.get_new_stamp()
if cur_stamp < self.last_stamp:
raise self.RuntimeError(
"Clock moved backwards. Refusing to generate id")
if cur_stamp == self.last_stamp:
# 相同毫秒内,序列号自增
self.sequence = (self.sequence + 1) & MAX_SEQUENCE
# 同一秒的序列数已经达到最大
if self.sequence == 0:
cur_stamp = self.get_next_mill()
else:
# 不同秒内序列号为0
self.sequence = 0
self.last_stamp = cur_stamp
return (cur_stamp << TIMESTAMP_LEFT) | (
self.datacenter_id << DATACENTER_LEFT) | (
self.machineId << MACHINE_LEFT) | self.sequence
def get_next_mill(self):
mill = self.get_new_stamp()
while mill <= self.last_stamp:
mill = self.get_new_stamp()
return mill
@staticmethod
def get_new_stamp():
now = lambda: int(time.time() * 1000)
return now()
if __name__ == "__main__":
print(SnowFlake().nextId())