admin管理员组

文章数量:1444600

高并发内存池并不难~

这个项目做的是什么?

原型是源于Google开源项目tcmalloc,其全称是Thread-Caching Malloc,即为具有线程缓存的malloc,实现了搞笑的多线程内存管理,用于代替系统的malloc和free。

其就是添加三级缓存为中间介质管理内存内碎片,减少内存损耗时间。

什么是内存池

1.池化技术

所谓 “ 池化技术 ” ,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。

在计算机中,有很多使用 “ 池 ” 这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠

状态。

2.内存池

内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出( 或者特定时间 ) 时,内存池才将之前申请的内存真正释放。

3.内存池主要解决的问题

内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那么什么是内存碎片呢?

再需要补充说明的是内存碎片分为外碎片和内碎片,上面我们讲的外碎片问题。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。内碎片问题。

4.malloc

C/C++ 中我们要动态申请内存都是通过 malloc 去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,而malloc 就是一个内存池。 malloc() 相当于向操作系统 “ 批发 ” 了一块较大的内存空间,然后 “ 零售 ” 给程序用。当全部“ 售完 ” 或程序有大量的内存需求时,再根据实际需求向操作系统 “ 进货 ” 。 malloc 的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows 的 vs 系列用的微软自己写的一套,linux gcc用的 glibc 中的 ptmalloc

定长内存池

代码语言:javascript代码运行次数:0运行复制
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
    void* ptr = VirtualAlloc(0, kpage*(1<<12), MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
    // linux下brk mmap等
#endif
    if (ptr == nullptr)
        throw std::bad_alloc();
    return ptr;
}

template<class T>
class ObjectPool
{
public:
    T* New()
    {
        T* obj = nullptr;
        // 如果自由链表有对象,直接取一个
        if (_freeList)
        {
            obj = (T*)_freeList;
            _freeList = *((void**)_freeList);
        }
        else
        {
            if (_leftBytes < sizeof(T))
            {
                _leftBytes = 128 * 1024;
                //_memory = (char*)malloc(_leftBytes);
                _memory = (char*)SystemAlloc(_leftBytes);
                if (_memory == nullptr)
                {
                    throw std::bad_alloc();
                }
            }
            obj = (T*)_memory;
            size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
            _memory += objSize;
            _leftBytes -= objSize;
        }
        // 使用定位new调用T的构造函数初始化
        new(obj)T;
        return obj;
    }

    void Delete(T* obj)
    {
        // 显示调用的T的析构函数进行清理
        obj->~T();
        // 头插到freeList
        *((void**)obj) = _freeList;
        _freeList = obj;
    }

private:
    char* _memory = nullptr; // 指向内存块的指针
    int _leftBytes = 0; // 内存块中剩余字节数
    void* _freeList = nullptr;  // 管理还回来的内存对象的自由链表
};

struct TreeNode
{
    int _val;
    TreeNode* _left;
    TreeNode* _right;

    TreeNode()
        : _val(0)
        , _left(nullptr)
        , _right(nullptr)
    {}
};

void TestObjectPool()
{
    // 申请释放的轮次
    const size_t Rounds = 3;
    // 每轮申请释放多少次
    const size_t N = 100000; 
    size_t begin1 = clock();
    std::vector<TreeNode*> v1;
    v1.reserve(N);
    for (size_t j = 0; j < Rounds; ++j)
    {
        for (int i = 0; i < N; ++i)
        {
            v1.push_back(new TreeNode);
        }
        for (int i = 0; i < N; ++i)
        {
            delete v1[i];
        }
        v1.clear();
    }
    size_t end1 = clock();

    ObjectPool<TreeNode> TNPool;
    size_t begin2 = clock();
    std::vector<TreeNode*> v2;
    v2.reserve(N);
    for (size_t j = 0; j < Rounds; ++j)
    {
        for (int i = 0; i < N; ++i)
        {
            v2.push_back(TNPool.New());
        }
        for (int i = 0; i < N; ++i)
        {
            TNPool.Delete(v2[i]);
        }
        v2.clear();
    }
    size_t end2 = clock();

    cout <<"new cost time:" <<end1 - begin1 << endl;
    cout <<"object pool cost time:" <<end2 - begin2 << endl;
}

高并发内存池整体框架设计

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。 malloc 本身其实已经很优秀,那么我们项目的原型tcmalloc 就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题

1. 性能问题。 2. 多线程环境下,锁竞争问题。 3. 内存碎片问题。

concurrent memory pool 主要由以下 3 个部分构成:

1. thread cache :线程缓存是每个线程独有的,用于小于 256KB 的内存的分配, 线程从这里申请内 存不需要加锁,每个线程独享一个 cache ,这也就是这个并发线程池高效的地方 。 2. central cache :中心缓存是所有线程所共享, thread cache 是 按需从 central cache 中获取 的对象。central cache 合适的时机回收 thread cache 中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的 。 central cache 是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有 thread cache 没有内存对象时才会找 central cache ,所以这里竞争不会很激烈 。 3. page cache :页缓存是在 central cache 缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache 没有内存对象时,从 page cache 分配出一定数量的 page ,并切割成定长大小的小块内存,分配给central cache 。 当一个 span 的几个跨度页的对象都回收以后, page cache 会回收 central cache 满足条件的 span 对象,并且合并相邻的页,组成更大的页,缓解内存碎片 的问题

高并发内存池--thread cache

thread cache 是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache 对象,这样每个线程在这里获取对象和释放对象时是无锁的。

申请内存:

1. 当内存申请 size<=256KB ,先获取到线程本地存储的 thread cache 对象,计算 size 映射的哈希桶自由链表下标i 。 2. 如果自由链表 _freeLists[i] 中有对象,则直接 Pop 一个内存对象返回。 3. 如果 _freeLists[i] 中没有对象时,则批量从 central cache 中获取一定数量的对象,插入到自由链表并返回一个对象

释放内存:

1. 当释放内存小于 256k 时将内存释放回 thread cache ,计算 size 映射自由链表桶位置 i ,将对象 Push到_freeLists[i] 。 2. 当链表的长度过长,则回收一部分内存对象到 central cache 。

thread cache 代码框架:

代码语言:javascript代码运行次数:0运行复制
// thread cache本质是由一个哈希映射的对象自由链表构成
class ThreadCache
{
public:
    // 申请和释放内存对象
    void* Allocate(size_t size);
    void Deallocate(void* ptr, size_t size);
    // 从中心缓存获取对象
    void* FetchFromCentralCache(size_t index, size_t size);
    // 释放对象时,链表过长时,回收内存回到中心缓存
    void ListTooLong(FreeList& list, size_t size);

private:
    FreeList _freeLists[NFREELIST];
};

// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

// 管理小对象的自由链表
// 管理切分好的小对象的自由链表
class FreeList
{
public:
    void Push(void* obj)
    {
        assert(obj);
        // 头插
        //*(void**)obj = _freeList;
        NextObj(obj) = _freeList;
        _freeList = obj;
        ++_size;
    }

    void PushRange(void* start, void* end, size_t n)
    {
        NextObj(end) = _freeList;
        _freeList = start;
        _size += n;
    }

    void PopRange(void*& start, void*& end, size_t n)
    {
        assert(n >= _size);
        start = _freeList;
        end = start;
        for (size_t i = 0; i < n - 1; ++i)
        {
            end = NextObj(end);
        }
        _freeList = NextObj(end);
        NextObj(end) = nullptr;
        _size -= n;
    }

    void* Pop()
    {
        assert(_freeList);
        // 头删
        void* obj = _freeList;
        _freeList = NextObj(obj);
        --_size;
        return obj;
    }

    bool Empty()
    {
        return _freeList == nullptr;
    }

    size_t& MaxSize()
    {
        return _maxSize;
    }

    size_t Size()
    {
        return _size;
    }

private:
    void* _freeList = nullptr;
    size_t _maxSize = 1;
    size_t _size = 0;
};

自由链表的哈希桶跟对象大小的映射关系

代码语言:javascript代码运行次数:0运行复制
// 小于等于MAX_BYTES,就找thread cache申请
// 大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
// thread cache 和 central cache自由链表哈希桶的表大小
static const size_t NFREELISTS = 208;
// page cache 管理span list哈希表大小
static const size_t NPAGES = 129;
// 页大小转换偏移, 即一页定义为2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;
// 地址大小类型,32位下是4byte类型,64位下是8byte类型
#ifdef _WIN32
typedef size_t ADDR_INT;
#else
typedef unsigned long long ADDR_INT;
#endif // _WIN32
// 页编号类型,32位下是4byte类型,64位下是8byte类型
#ifdef _WIN32
typedef size_t PageID;
#else
typedef unsigned long long PageID;
#endif // _WIN32
// 获取内存对象中存储的头4 or 8字节值,即链接的下一个对象的地址
inline void*& NextObj(void* obj)
{
    return *((void**)obj);
}

// 管理对齐和映射等关系
class SizeClass
{
public:
    // 整体控制在最多10%左右的内碎片浪费
    // [1,128] 8byte对齐       freelist[0,16)
    // [128+1,1024] 16byte对齐   freelist[16,72)
    // [1024+1,8*1024] 128byte对齐   freelist[72,128)
    // [8*1024+1,64*1024] 1024byte对齐     freelist[128,184)
    // [64*1024+1,256*1024] 8*1024byte对齐   freelist[184,208)
    static inline size_t _RoundUp(size_t bytes, size_t align)
    {
        return (((bytes) + align - 1) & ~(align - 1));
    }

    // 对齐大小计算
    static inline size_t RoundUp(size_t bytes)
    {
        if (bytes <= 128)
        {
            return _RoundUp(bytes, 8);
        }
        else if (bytes <= 1024)
        {
            return _RoundUp(bytes, 16);
        }
        else if (bytes <= 8 * 1024)
        {
            return _RoundUp(bytes, 128);
        }
        else if (bytes <= 64 * 1024)
        {
            return _RoundUp(bytes, 1024);
        }
        else if (bytes <= 256 * 1024)
        {
            return _RoundUp(bytes, 8 * 1024);
        }
        else
        {
            return _RoundUp(bytes, 1 << PAGE_SHIFT);
        }
        // 注意:这里的return -1;是 unreachable code,因为所有的条件都已经覆盖
    }

    static inline size_t _Index(size_t bytes, size_t align_shift)
    {
        return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
    }

    // 计算映射的哪一个自由链表桶
    static inline size_t Index(size_t bytes)
    {
        assert(bytes <= MAX_BYTES);
        // 每个区间有多少个链
        static int group_array[4] = { 16, 56, 56, 56 };
        if (bytes <= 128)
        {
            return _Index(bytes, 3);
        }
        else if (bytes <= 1024)
        {
            return _Index(bytes - 128, 4) + group_array[0];
        }
        else if (bytes <= 8 * 1024)
        {
            return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
        }
        else if (bytes <= 64 * 1024)
        {
            return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
        }
        else if (bytes <= 256 * 1024)
        {
            return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
        }
        else
        {
            assert(false);
        }
        // 注意:这里的return -1;是 unreachable code,因为所有的条件都已经覆盖
    }

    // 一次从中心缓存获取多少个
    static size_t NumMoveSize(size_t size)
    {
        if (size == 0)
            return 0;
        // [2, 512],一次批量移动多少个对象的(慢启动)上限值
        // 小对象一次批量上限高
        // 大对象一次批量上限低
        int num = MAX_BYTES / size;
        if (num < 2)
            num = 2;
        if (num > 512)
            num = 512;
        return num;
    }

    // 单个对象 8byte
    // ...
    // 单个对象 256KB
    static size_t NumMovePage(size_t size)
    {
        size_t num = NumMoveSize(size);
        size_t npage = num * size;
        npage >>= PAGE_SHIFT;
        if (npage == 0)
            npage = 1;
        return npage;
    }
};

高并发内存池--central cache

central cache 也是一个哈希桶结构,他的哈希桶的映射关系跟 thread cache 是一样的。不同的是他的每个哈希桶位置挂是SpanList 链表结构,不过每个映射桶下面的 span 中的大内存块被按映射关系切成了一个个小内存块对象挂在span 的自由链表中。

申请内存:

1. 当 thread cache 中没有内存时,就会批量向 central cache 申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp 协议拥塞控制的慢开始算法; central cache 也有一个哈希映射的spanlist, spanlist 中挂着 span ,从 span 中取出对象给 thread cache ,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。 2. central cache 映射的 spanlist 中所有 span 的都没有内存以后,则需要向 page cache 申请一个新的span对象,拿到 span 以后将 span 管理的内存按大小切好作为自由链表链接到一起。然后从 span中取对象给thread cache 。 3. central cache 的中挂的 span 中 use_count 记录分配了多少个对象出去,分配一个对象给 thread cache,就 ++use_count

释放内存:

当 thread_cache 过长或者线程销毁,则会将内存释放回 central cache 中的,释放回来时 -- use_count 。当 use_count 减到 0 时则表示所有对象都回到了 span ,则将 span 释放回 page cache , page cache中会对前后相邻的空闲页进行合并。

CentralCache 代码框架:

代码语言:javascript代码运行次数:0运行复制
// 单例模式
class CentralCache
{
public:
    static CentralCache* GetInstance()
    {
        return &_sInst;
    }

    // 获取一个非空的span
    Span* GetOneSpan(SpanList& list, size_t byte_size);

    // 从中心缓存获取一定数量的对象给thread cache
    size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);

    // 将一定数量的对象释放到span跨度
    void ReleaseListToSpans(void* start, size_t byte_size);

private:
    SpanList _spanLists[NFREELIST];

    CentralCache()
    {}

    CentralCache(const CentralCache&) = delete;

    static CentralCache _sInst;
};

以页为单位的大内存管理 span 的定义及 spanlist 定义

代码语言:javascript代码运行次数:0运行复制
// Span管理一个跨度的大块内存
// 管理以页为单位的大块内存
// 管理多个连续页大块内存跨度结构
struct Span
{
    PAGE_ID _pageId = 0; // 大块内存起始页的页号
    size_t  _n = 0;      // 页的数量
    Span* _next = nullptr; // 双向链表的结构
    Span* _prev = nullptr;
    size_t _objSize = 0;  // 切好的小对象的大小
    size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数
    void* _freeList = nullptr;  // 切好的小块内存的自由链表
    bool _isUse = false;        // 是否在被使用
};

// 带头双向循环链表
class SpanList
{
public:
    SpanList()
    {
        _head = new Span;
        _head->_next = _head;
        _head->_prev = _head;
    }

    Span* Begin()
    {
        return _head->_next;
    }

    Span* End()
    {
        return _head;
    }

    bool Empty()
    {
        return _head->_next == _head;
    }

    void PushFront(Span* span)
    {
        Insert(Begin(), span);
    }

    Span* PopFront()
    {
        Span* front = _head->_next;
        Erase(front);
        return front;
    }

    void Insert(Span* pos, Span* newSpan)
    {
        assert(pos);
        assert(newSpan);
        Span* prev = pos->_prev;
        // prev newspan pos
        prev->_next = newSpan;
        newSpan->_prev = prev;
        newSpan->_next = pos;
        pos->_prev = newSpan;
    }

    void Erase(Span* pos)
    {
        assert(pos);
        assert(pos != _head);
        Span* prev = pos->_prev;
        Span* next = pos->_next;
        prev->_next = next;
        next->_prev = prev;
    }

private:
    Span* _head;

public:
    std::mutex _mtx; // 桶锁
};

高并发内存池--page cache

申请内存:

1. 当 central cache 向 page cache 申请内存时, page cache 先检查对应位置有没有 span ,如果没有 则向更大页寻找一个 span ,如果找到则分裂成两个 。比如:申请的是 4 页 page , 4 页 page 后面没有挂span ,则向后面寻找更大的 span ,假设在 10 页 page 位置找到一个 span ,则将 10 页 page span分裂为一个 4 页 page span 和一个 6 页 page span 。 2. 如果找到 _spanList[128] 都没有合适的 span ,则向系统使用 mmap 、 brk 或者是 VirtualAlloc 等方式申请128 页 page span 挂在自由链表中,再重复 1 中的过程。 3. 需要注意的是 central cache 和 page cache 的核心结构都是 spanlist 的哈希桶,但是他们是有本质区别的,central cache 中哈希桶,是按跟 thread cache 一样的大小对齐关系映射的,他的 spanlist中挂的span 中的内存都被按映射关系切好链接成小块内存的自由链表。而 page cache 中的spanlist则是按下标桶号映射的,也就是说第 i 号桶中挂的 span 都是 i 页内存。

释放内存:

如果 central cache 释放回一个 span , 则依次寻找 span 的前后 page id 的没有在使用的空闲 span 看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的 span ,减少 内存碎片

PageCache 代码框架:

代码语言:javascript代码运行次数:0运行复制
// 1.page cache是一个以页为单位的span自由链表
// 2.为了保证全局只有唯一的page cache,这个类被设计成了单例模式。
class PageCache
{
public:
    static PageCache* GetInstance()
    {
        return &_sInst;
    }
    
    // 获取从对象到span的映射
    Span* MapObjectToSpan(void* obj);
    
    // 释放空闲span回到Pagecache,并合并相邻的span
    void ReleaseSpanToPageCache(Span* span);
    
    // 获取一个K页的span
    Span* NewSpan(size_t k);

    std::mutex _pageMtx;

private:
    SpanList _spanLists[NPAGES];
    ObjectPool<Span> _spanPool;
    
    // 使用TCMalloc_PageMap1作为页号到Span的映射
    TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;

    PageCache()
    {}

    PageCache(const PageCache&) = delete;

    static PageCache _sInst;
};
windows和Linux下如何直接向堆申请页为单位的大块内存:
代码语言:javascript代码运行次数:0运行复制
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
    void* ptr = VirtualAlloc(0, kpage * (1 << PAGE_SHIFT),
                             MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
    if (ptr == nullptr)
        throw std::bad_alloc();
    return ptr;
#else
    // Linux下使用brk或mmap等系统调用进行内存分配
    // 示例使用mmap,实际使用时需要根据具体情况选择
    void* ptr = mmap(nullptr, kpage * (1 << PAGE_SHIFT),
                     PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
    if (ptr == MAP_FAILED)
        throw std::bad_alloc();
    return ptr;
#endif
}

inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
    VirtualFree(ptr, 0, MEM_RELEASE);
#else
    // Linux下使用munmap释放内存
    // 注意:如果使用brk分配,则需要使用不同的方法释放
    munmap(ptr, 0); // 0表示释放整个映射区域
#endif
}

多线程并发环境下,对比malloc和ConcurrentAlloc申请和释放内存效率对比

代码语言:javascript代码运行次数:0运行复制
#include <iostream>
#include <vector>
#include <thread>
#include <atomic>
#include <chrono>
#include <cstdlib> // For malloc and free

// 假设ConcurrentAlloc和ConcurrentFree是您自定义的并发内存分配和释放函数
void* ConcurrentAlloc(size_t size);
void ConcurrentFree(void* ptr);

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
    std::vector<std::thread> vthread(nworks);
    std::atomic<size_t> malloc_costtime(0);
    std::atomic<size_t> free_costtime(0);

    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = std::thread([&, k]() {
            std::vector<void*> v;
            v.reserve(ntimes);

            for (size_t j = 0; j < rounds; ++j)
            {
                auto begin1 = std::chrono::high_resolution_clock::now();
                for (size_t i = 0; i < ntimes; i++)
                {
                    v.push_back(malloc(16));
                }
                auto end1 = std::chrono::high_resolution_clock::now();

                auto begin2 = std::chrono::high_resolution_clock::now();
                for (size_t i = 0; i < ntimes; i++)
                {
                    free(v[i]);
                }
                auto end2 = std::chrono::high_resolution_clock::now();

                v.clear();

                malloc_costtime += std::chrono::duration_cast<std::chrono::milliseconds>(end1 - begin1).count();
                free_costtime += std::chrono::duration_cast<std::chrono::milliseconds>(end2 - begin2).count();
            }
        });
    }

    for (auto& t : vthread)
    {
        t.join();
    }

    printf("%zu个线程并发执行%zu轮次,每轮次malloc %zu次: 花费:%zu ms\n",
           nworks, rounds, ntimes, malloc_costtime.load());
    printf("%zu个线程并发执行%zu轮次,每轮次free %zu次: 花费:%zu ms\n",
           nworks, rounds, ntimes, free_costtime.load());
    printf("%zu个线程并发malloc&free %zu次,总计花费:%zu ms\n",
           nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}

void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
    // ... 与BenchmarkMalloc类似,只是替换malloc和free为ConcurrentAlloc和ConcurrentFree
}

int main()
{
    size_t n = 10000;
    std::cout << "==========================================================" << std::endl;
    BenchmarkConcurrentMalloc(n, 4, 10);
    std::cout << std::endl << std::endl;
    BenchmarkMalloc(n, 4, 10);
    std::cout << "==========================================================" << std::endl;
    return 0;
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2025-03-19,如有侵权请联系 cloudcommunity@tencent 删除对象高并发链表内存线程

本文标签: 高并发内存池并不难