分类

类型:
不限 游戏开发 计算机程序开发 Android开发 网站开发 笔记总结 其他
评分:
不限 10 9 8 7 6 5 4 3 2 1
原创:
不限
年份:
不限 2018 2019

技术文章列表

  • 面试集合剑指offer——重建二叉树

    题目描述输入某二叉树的前序遍历和中序遍历的结果,请重建出该二叉树。假设输入的前序遍历和中序遍历的结果中都不含重复的数字。例如输入前序遍历序列{1,2,4,7,3,5,6,8}和中序遍历序列{4,7,2,1,5,3,8,6},则重建二叉树并返回。
    思路简述根据先序得到根节点(排在第一个),然后根据中序划分出左右子树,不停递归直到得到二叉树。
    具体题解为:
    由先序遍历的规则知:根节点为 {1}由中序遍历规则知:左子树为 {4,7,2}根节点为 {1}右子树为{5,3,8,6}
    我们在左子树和右子树重复刚才操作,
    左子树: 中序遍历为{4,7,2} 先序为{2,4,7}根节点为{2},左子树为{4,7},左子树先序为{4,7},中序为{4,7},所以 {7} 是{4}的右子树
    如右子树:中序遍历为{5,3,8,6} 先序遍历为 {3,5,6,8}{3}为根节点,左子树{5},右子树{6,8} ,其中其先序为{6,8} 中序为{8,6}说明该右子树的左节点为{8},根节点为{6};
    代码实现import java.util.Arrays;public class Solution { public TreeNode reConstructBinaryTree(int[] pre, int[] in) { if (pre == null || in == null) { return null; } if (pre.length == 0 || in.length == 0) { return null; } if (pre.length != in.length) { return null; } TreeNode root = new TreeNode(pre[0]);//第一个 for (int i = 0; i < in.length; i++) { if (pre[0] == in[i]) { //pre的0往后数i个是左子树的,copyofrange包含前面的下标,不包含后面的下标 //in的i往前数i个是左子树的。 root.left = reConstructBinaryTree(Arrays.copyOfRange(pre, 1, i + 1), Arrays.copyOfRange(in, 0, i)); //注意in是从i+1开始,因为i是现在的根,i+1开始才是右子树 root.right = reConstructBinaryTree(Arrays.copyOfRange(pre, i + 1, pre.length), Arrays.copyOfRange(in, i + 1, in.length)); } } return root; }}
    0 留言 2019-02-16 16:43:36 奖励5点积分
  • 基于PsSetCreateProcessNotifyRoutineEx实现监控进程创建并阻止创建

    背景对于内核层实现监控进程的创建或者退出,你可能第一时间会想到 HOOK 内核函数 ZwOpenProcess、ZwTerminateProcess 等。确定,在内核层中的 HOOK 已经给人留下太多深刻的印象了,有 SSDT HOOK、Inline HOOK、IRP HOOK、过滤驱动等等。
    但是,Windows 其实给我们提供现成的内核函数接口,方便我们在内核下监控用户层上进程的创建和退出的情况。即 PsSetCreateProcessNotifyRoutineEx 内核函数,可以设置一个回调函数,来监控进程的创建和退出,同时还能控制是否允许创建进程。
    现在,本文就使用 PsSetCreateProcessNotifyRoutineEx 实现监控进程的创建的实现过程和原理进行整理,形成文档,分享给大家。
    函数介绍PsSetCreateProcessNotifyRoutineEx 函数
    设置进程回调监控进程创建与退出,而且还能控制是否允许进程创建。
    函数声明
    NTSTATUS PsSetCreateProcessNotifyRoutineEx( _In_ PCREATE_PROCESS_NOTIFY_ROUTINE_EX NotifyRoutine, _In_ BOOLEAN Remove);
    参数

    NotifyRoutine [in]指向PCREATE_PROCESS_NOTIFY_ROUTINE_EX例程以注册或删除的指针。 创建新进程时,操作系统将调用此例程。Remove[in]一个布尔值,指定PsSetCreateProcessNotifyRoutineEx是否会从回调例程列表中添加或删除指定的例程。 如果此参数为TRUE,则从回调例程列表中删除指定的例程。 如果此参数为FALSE,则将指定的例程添加到回调例程列表中。 如果删除为TRUE,系统还会等待所有正在运行的回调例程运行完成。
    返回值

    成功,则返回 STATUS_SUCCESS;否则,返回其它失败错误码 NTSTATUS。

    PCREATE_PROCESS_NOTIFY_ROUTINE_EX 回调函数
    函数声明
    PCREATE_PROCESS_NOTIFY_ROUTINE_EX SetCreateProcessNotifyRoutineEx;void SetCreateProcessNotifyRoutineEx( _In_ HANDLE ParentId, _In_ HANDLE ProcessId, _Inout_opt_ PPS_CREATE_NOTIFY_INFO CreateInfo){ ... }
    参数

    ParentId [in]父进程的进程ID。ProcessId [in]进程的进程ID。CreateInfo [in,out,optional]指向PS_CREATE_NOTIFY_INFO结构的指针,其中包含有关新进程的信息。为 NULL 时,表示进程退出;不为 NULL 时,表示进程创建。
    返回值

    无返回值。

    PS_CREATE_NOTIFY_INFO 结构体
    typedef struct _PS_CREATE_NOTIFY_INFO { SIZE_T Size; union { ULONG Flags; struct { ULONG FileOpenNameAvailable :1; ULONG IsSubsystemProcess :1; ULONG Reserved :30; }; }; HANDLE ParentProcessId; CLIENT_ID CreatingThreadId; struct _FILE_OBJECT *FileObject; PCUNICODE_STRING ImageFileName; PCUNICODE_STRING CommandLine; NTSTATUS CreationStatus;} PS_CREATE_NOTIFY_INFO, *PPS_CREATE_NOTIFY_INFO;
    成员

    Size该结构的大小(以字节为单位)。 Flags保留。 请改用FileOpenNameAvailable成员。FileOpenNameAvailable一个布尔值,指定ImageFileName成员是否包含用于打开进程可执行文件的确切文件名。
    IsSubsystemProcess指示进程子系统类型的布尔值是Win32以外的子系统。
    Reserved保留供系统使用。
    ParentProcessId新进程的父进程的进程ID。 请注意,父进程不一定与创建新进程的进程相同。 新进程可以继承父进程的某些属性,如句柄或共享内存。 (进程创建者的进程ID由CreatingThreadId-> UniqueProcess给出。)
    CreatingThreadId创建新进程的进程和线程的进程ID和线程ID。 CreatingThreadId-> UniqueProcess包含进程ID,而CreatingThreadId-> UniqueThread包含线程ID。
    FileObject指向进程可执行文件的文件对象的指针。如果IsSubsystemProcess为TRUE,则此值可能为NULL。
    ImageFileName指向保存可执行文件的文件名的UNICODE_STRING字符串的指针。 如果FileOpenNameAvailable成员为TRUE,则该字符串指定用于打开可执行文件的确切文件名。 如果FileOpenNameAvailable为FALSE,则操作系统可能仅提供部分名称。如果IsSubsystemProcess为TRUE,则此值可能为NULL。
    CommandLine指向UNICODE_STRING字符串的指针,该字符串保存用于执行该过程的命令。 如果命令不可用,CommandLine为NULL。如果IsSubsystemProcess为TRUE,则此值可能为NULL。
    CreationStatus用于进程创建操作返回的NTSTATUS值。 驱动程序可以将此值更改为错误代码,以防止创建进程。


    实现原理破解 PsSetCreateProcessNotifyRoutineEx 函数的使用限制第一种方法在讲解怎么使用 PsSetCreateProcessNotifyRoutineEx 函数来注册回调之前,先来讲解下 Windows 对这个函数做的限制:驱动程序必须有数字签名才能使用此函数。经逆向研究,内核通过 MmVerifyCallbackFunction 验证此回调是否合法, 但此函数只是简单的验证了一下 DriverObject->DriverSection->Flags 的值是不是为 0x20:
    nt!MmVerifyCallbackFunction+0x75: fffff800`01a66865 f6406820 test byte ptr [rax+68h],20h fffff800`01a66869 0f45fd cmovne edi,ebp
    所以破解方法非常简单,只要把 DriverObject->DriverSection->Flags 的值按位或 0x20 即可。其中,DriverSection 是指向 LDR_DATA_TABLE_ENTRY 结构的值,要注意该结构在 32 位和 64 位系统下的定义。
    // 注意32位与64位的对齐大小#ifndef _WIN64 #pragma pack(1) #endiftypedef struct _LDR_DATA_TABLE_ENTRY{ LIST_ENTRY InLoadOrderLinks; LIST_ENTRY InMemoryOrderLinks; LIST_ENTRY InInitializationOrderLinks; PVOID DllBase; PVOID EntryPoint; ULONG SizeOfImage; UNICODE_STRING FullDllName; UNICODE_STRING BaseDllName; ULONG Flags; USHORT LoadCount; USHORT TlsIndex; union { LIST_ENTRY HashLinks; struct { PVOID SectionPointer; ULONG CheckSum; }; }; union { ULONG TimeDateStamp; PVOID LoadedImports; }; PVOID EntryPointActivationContext; PVOID PatchInformation; LIST_ENTRY ForwarderLinks; LIST_ENTRY ServiceTagLinks; LIST_ENTRY StaticLinks;} LDR_DATA_TABLE_ENTRY, *PLDR_DATA_TABLE_ENTRY;#ifndef _WIN64 #pragma pack()#endif
    第二种方法使用此函数, 一定要设置 IMAGE_OPTIONAL_HEADER 中的 DllCharacterisitics 字段设置为:IMAGE_DLLCHARACTERISITICS_FORCE_INTEGRITY 属性,该属性是一个驱动强制签名属性。使用 VS2013 开发环境设置方式是:

    右击项目,选择属性
    选中配置属性中的链接器,点击命令行
    在其它选项中输入: /INTEGRITYCHECK 表示设置; /INTEGRITYCHECK:NO 表示不设置

    这样,设置之后,驱动程序必须要进行驱动签名才可正常运行!
    创建回调并监控进程创建我们根据上面的函数介绍,大概知道实现的流程了吧。对于设置回调函数,直接调用 PsSetCreateProcessNotifyRoutineEx 函数来设置就好。传入设置的回调函数名称以及删除标志参数设置为 FALSE,表示创建回调函数。这样,就可以成功设置进程监控的回调函数了。
    那么,我们的回调函数也并不复杂,它的函数声明为:
    void SetCreateProcessNotifyRoutineEx( _In_ HANDLE ParentId, _In_ HANDLE ProcessId, _Inout_opt_ PPS_CREATE_NOTIFY_INFO CreateInfo);
    回调函数的名称可以任意,但是返回值类型以及函数参数类型必须是固定的,不能变更。回调函数的第一个参数 ParentId 表示父进程ID,第二个参数 ProcessId 表示进程ID,第三个参数 CreateInfo 为 NULL 时,表示进程退出;不为 NULL 时,表示进程创建。那么,创建进程的信息就存储在 PS_CREATE_NOTIFY_INFO 结构体中。
    我们可以从 PS_CREATE_NOTIFY_INFO 中获取进程名称、路径、命令行、PID等进程信息。同时,可以通过设置成员 CreationStatus 的值来控制进程是否创建。当 CreationStatus 的值为 STATUS_SUCCESS 表示创建进程,否则,不创建进程。例如不创建进程的时候,CreationStatus 可以为 STATUS_UNSUCCESSFUL 错误码。
    当我们要删除回调设置的时候,只需要调用 PsSetCreateProcessNotifyRoutineEx 函数,传入回调函数名称以及删除标志参数设置为 TRUE。这样,就可以成功删除设置的回调函数了。
    编码实现编程方式绕过签名检查// 编程方式绕过签名检查BOOLEAN BypassCheckSign(PDRIVER_OBJECT pDriverObject){#ifdef _WIN64 typedef struct _KLDR_DATA_TABLE_ENTRY { LIST_ENTRY listEntry; ULONG64 __Undefined1; ULONG64 __Undefined2; ULONG64 __Undefined3; ULONG64 NonPagedDebugInfo; ULONG64 DllBase; ULONG64 EntryPoint; ULONG SizeOfImage; UNICODE_STRING path; UNICODE_STRING name; ULONG Flags; USHORT LoadCount; USHORT __Undefined5; ULONG64 __Undefined6; ULONG CheckSum; ULONG __padding1; ULONG TimeDateStamp; ULONG __padding2; } KLDR_DATA_TABLE_ENTRY, *PKLDR_DATA_TABLE_ENTRY;#else typedef struct _KLDR_DATA_TABLE_ENTRY { LIST_ENTRY listEntry; ULONG unknown1; ULONG unknown2; ULONG unknown3; ULONG unknown4; ULONG unknown5; ULONG unknown6; ULONG unknown7; UNICODE_STRING path; UNICODE_STRING name; ULONG Flags; } KLDR_DATA_TABLE_ENTRY, *PKLDR_DATA_TABLE_ENTRY;#endif PKLDR_DATA_TABLE_ENTRY pLdrData = (PKLDR_DATA_TABLE_ENTRY)pDriverObject->DriverSection; pLdrData->Flags = pLdrData->Flags | 0x20; return TRUE;}
    设置回调// 设置回调函数NTSTATUS SetProcessNotifyRoutine(){ NTSTATUS status = PsSetCreateProcessNotifyRoutineEx((PCREATE_PROCESS_NOTIFY_ROUTINE_EX)ProcessNotifyExRoutine, FALSE); if (!NT_SUCCESS(status)) { ShowError("PsSetCreateProcessNotifyRoutineEx", status); } return status;}
    回调函数// 回调函数VOID ProcessNotifyExRoutine(PEPROCESS pEProcess, HANDLE hProcessId, PPS_CREATE_NOTIFY_INFO CreateInfo){ // CreateInfo 为 NULL 时,表示进程退出;不为 NULL 时,表示进程创建 if (NULL == CreateInfo) { return; } // 获取进程名称 PCHAR pszImageFileName = PsGetProcessImageFileName(pEProcess); // 显示创建进程信息 DbgPrint("[%s][%d][%wZ]\n", pszImageFileName, hProcessId, CreateInfo->ImageFileName); // 禁止指定进程(520.exe)创建 if (0 == _stricmp(pszImageFileName, "520.exe")) { // 禁止创建 CreateInfo->CreationStatus = STATUS_UNSUCCESSFUL; DbgPrint("[禁止创建]\n"); }}
    删除回调// 删除回调函数NTSTATUS RemoveProcessNotifyRoutine(){ NTSTATUS status = PsSetCreateProcessNotifyRoutineEx((PCREATE_PROCESS_NOTIFY_ROUTINE_EX)ProcessNotifyExRoutine, TRUE); if (!NT_SUCCESS(status)) { ShowError("PsSetCreateProcessNotifyRoutineEx", status); } return status;}
    程序测试在 Win7 32 位系统下,驱动程序正常执行:

    在 Win10 64 位系统下,驱动程序正常执行:

    总结这个程序实现起来并不复杂,关键是对 PsSetCreateProcessNotifyRoutineEx 函数要理解透彻,理解清楚回调函数中,PS_CREATE_NOTIFY_INFO 结构体的所有成员含义。这样,我们就可以获取进程信息,以及控制进程的创建。
    注意,破解 PsSetCreateProcessNotifyRoutineEx 函数的使用限制有两种方式,一种是通过编程来解决;一种是通过 VS 开发环境和数字签名来解决。
    1 留言 2019-02-16 14:36:44 奖励8点积分
  • Minifilter驱动程序与用户层程序通信 精华

    背景通常 NT 驱动程序与用户层间的通信,可以由用户层调用 CreateFile 函数打开驱动设备并获取设备句柄,然后调用 DeviceIoControl 函数实现用户层数据和内核层数据的交互。
    那么,对于 Minifilter,它是一个 WDM 驱动,它并不像 NT 驱动那样使用常用的方式通信,而是有自己一套函数专门用于数据通信交互。现在,我就把程序的实现过程和原理整理成文档,分享给大家。
    实现过程用户层程序的实现过程导入库文件我们先来介绍下用户层上的程序的实现过程。首先,我们需要包含头文件 fltUser.h 以及库文件 fltLib.lib,这些文件在 VS 中并没有,它们存在于 WDK 中。我们可以设置程序的目录包含路径以及库文件包含路径,也可以将 WDK 中这两个文件拷贝到当前目录中来。我们选择后一种方法,将下面目录下的文件拷贝到当前目录中:

    C:\Program Files (x86)\Windows Kits\8.1\Include\um\fltUser.h
    C:\Program Files (x86)\Windows Kits\8.1\Lib\winv6.3\um\x86\fltLib.lib
    C:\Program Files (x86)\Windows Kits\8.1\Lib\winv6.3\um\x64\fltLib.lib

    那么,我们在程序中声明头文件以及导入库文件的代码为:
    #include "flt\\fltUser.h"#ifdef _WIN32 #pragma comment(lib, "flt\\lib\\x86\\fltLib.lib")#else #pragma comment(lib, "flt\\lib\\x64\\fltLib.lib")#endif
    调用函数实现交互用户层上实现于 Minifilter 内核层的数据交互方法,和用户层与 NT 驱动程序的交互方法很相似,虽然不是 CreateFile 打开对象获取句柄,在调用 DeviceIoControl 交互数据。具体的实现步骤如下:

    首先,调用 FilterConnectCommunicationPort 函数打开通信端口,获取端口的句柄
    然后,调用 FilterSendMessage 函数交互数据,向内核程序传入输入、输出缓冲区
    当交互结束,通信句柄不再使用的时候,调用 CloseHandle 函数关闭句柄

    综合上面 3 个步骤来看,是不是和 NT 驱动程序的交互方式很相似呢?我们通过类比记忆就好。其中,Minifilter 是通过端口的方式来实现数据交互的。具体的实现代码如下所示:
    int _tmain(int argc, _TCHAR* argv[]){ HANDLE hPort = NULL; char szInputBuf[MAX_PATH] = "From User Test!"; char szOutputBuf[MAX_PATH] = { 0 }; DWORD dwInputLen = 1 + ::lstrlen(szInputBuf); DWORD dwOutputLen = MAX_PATH; DWORD dwRet = 0; HRESULT hRet = NULL; // 打开并连接端口, 获取端口句柄. (类似CreateFile) hRet = ::FilterConnectCommunicationPort(PORT_NAME, 0, NULL, 0, NULL, &hPort); if (IS_ERROR(hRet)) { ::MessageBox(NULL, "FilterConnectCommunicationPort", NULL, MB_OK); return 1; } // 向端口发送数据. (类似 DeviceIoControl) hRet = ::FilterSendMessage(hPort, szInputBuf, dwInputLen, szOutputBuf, dwOutputLen, &dwRet); // 类似DeviceIoControl if (IS_ERROR(hRet)) { ::MessageBox(NULL, "FilterSendMessage", NULL, MB_OK); return 2; } // 显示数据 printf("InputBuffer:0x%x\n", szInputBuf); printf("OutputBuffer:0x%x\n", szOutputBuf); system("pause"); return 0;}
    内核层程序的实现过程从上面用户层程序的实现过程来看,和通常的交互方式来看,没有什么大区别,只是调用的函数变了而已。但是,对于内核层,却有很大的改变。
    我们知道,VS2013 里面有向导可以直接创建一个 Minifilter 驱动,可以生成代码框架和 inf 文件,这简化了很多工作。但是,VS2013 开发化境并没有帮我们生成与用户层通信部分的代码,所以,需要我们手动对代码进行更改,实现与用户层的数据通信。具体的步骤如下:
    1.首先,在内核程序的顶头声明 2 个全局变量,保存通信用的服务器端口以及客户端端口;并且声明 3 个回调函数:建立连接回调函数、数据通信回调函数、断开连接回调函数。
    // 端口名称#define PORT_NAME L"\\CommPort"// 服务器端口PFLT_PORT g_ServerPort;// 客户端端口PFLT_PORT g_ClientPort;// 建立连接回调函数NTSTATUS ConnectNotifyCallback( IN PFLT_PORT ClientPort, IN PVOID ServerPortCookies, IN PVOID ConnectionContext, IN ULONG SizeOfContext, OUT PVOID *ConnectionPortCokkie);// 数据通信回调函数NTSTATUS MessageNotifyCallback( IN PVOID PortCookie, IN PVOID InputBuffer OPTIONAL, IN ULONG InputBufferLength, OUT PVOID OutputBuffer, IN ULONG OutputBufferLength, OUT PULONG ReturnOutputBufferLength);// 断开连接回调函数VOID DisconnectNotifyCallback(_In_opt_ PVOID ConnectionCookie);
    2.然后,我们来到 DriverEntry 入口点函数,进行修改:

    首先,调用 FltRegisterFilter 注册过滤器
    然后,在使用 FltCreateCommunicationPort 函数创建通信端口之前,需要调用 FltBuildDefaultSecurityDescriptor 函数创建一个默认的安全描述符。其中,FLT_PORT_ALL_ACCESS 表示程序拥有连接到端口、访问端口等所有权限。其中,Minifilter 通常在调用 FltCreateCommunicationPort 函数之前会调用 FltBuildDefaultSecurityDescriptor 函数;在调用完 FltCreateCommunicationPort 函数后,会调用 FltFreeSecurityDescriptor 函数
    接着,调用 FltCreateCommunicationPort 创建通信服务器端口,使得Minifilter 驱动程序可以接收来自用户层程序的连接请求。可以通过该函数设置端口名称、建立连接回调函数、数据通信回调函数、断开连接回调函数、最大连接数等,同时可以获取服务器端口句柄
    然后,调用 FltFreeSecurityDescriptor 函数释放安全描述符
    最后,调用 FltStartFiltering 函数开始启动过滤注册的 Minifilter 驱动程序

    NTSTATUS DriverEntry ( _In_ PDRIVER_OBJECT DriverObject, _In_ PUNICODE_STRING RegistryPath ){ NTSTATUS status; UNREFERENCED_PARAMETER( RegistryPath ); PT_DBG_PRINT( PTDBG_TRACE_ROUTINES, ("Minifilter_Communicate_Test!DriverEntry: Entered\n") ); // // Register with FltMgr to tell it our callback routines // status = FltRegisterFilter( DriverObject, &FilterRegistration, &gFilterHandle ); FLT_ASSERT( NT_SUCCESS( status ) ); if (NT_SUCCESS( status )) { PSECURITY_DESCRIPTOR lpSD = NULL; // 创建安全描述, 注意:要创建这个安全描述,否则不能成功通信 status = FltBuildDefaultSecurityDescriptor(&lpSD, FLT_PORT_ALL_ACCESS); if (!NT_SUCCESS(status)) { KdPrint(("FltBuildDefaultSecurityDescriptor Error[0x%X]", status)); return status; } // 创建于用户层交互的端口 UNICODE_STRING ustrCommPort; OBJECT_ATTRIBUTES objectAttributes; RtlInitUnicodeString(&ustrCommPort, PORT_NAME); InitializeObjectAttributes(&objectAttributes, &ustrCommPort, OBJ_CASE_INSENSITIVE | OBJ_KERNEL_HANDLE, NULL, lpSD); status = FltCreateCommunicationPort(gFilterHandle, &g_ServerPort, &objectAttributes, NULL, ConnectNotifyCallback, DisconnectNotifyCallback, MessageNotifyCallback, 1); if (!NT_SUCCESS(status)) { KdPrint(("FltCreateCommunicationPort Error[0x%X]", status)); return status; } // 释放安全描述 FltFreeSecurityDescriptor(lpSD); // // Start filtering i/o // status = FltStartFiltering( gFilterHandle ); if (!NT_SUCCESS( status )) { FltUnregisterFilter( gFilterHandle ); } } return status;}
    其中,建立连接回调函数的代码为:
    NTSTATUS ConnectNotifyCallback( IN PFLT_PORT ClientPort, IN PVOID ServerPortCookies, IN PVOID ConnectionContext, IN ULONG SizeOfContext, OUT PVOID *ConnectionPortCokkie){ PAGED_CODE(); UNREFERENCED_PARAMETER(ServerPortCookies); UNREFERENCED_PARAMETER(ConnectionContext); UNREFERENCED_PARAMETER(SizeOfContext); UNREFERENCED_PARAMETER(ConnectionPortCokkie); // 可以加以判断,禁止非法的连接,从而给予保护 g_ClientPort = ClientPort; // 保存以供以后使用 return STATUS_SUCCESS;}
    只要有连接连接到端口上,就会调用此函数。我们可以在该回调函数中获取客户端的端口句柄。这个客户端端口句柄要保存下来,这样,我们的驱动程序才可以和建立连接的用户层程序使用该客户端句柄进行数据通信。
    其中,断开连接回调函数的代码为:
    VOID DisconnectNotifyCallback(_In_opt_ PVOID ConnectionCookie){ PAGED_CODE(); UNREFERENCED_PARAMETER(ConnectionCookie); // 应该加判断,如果ConnectionCookie == 我们的值就执行这行 FltCloseClientPort(gFilterHandle, &g_ClientPort);}
    每当有连接断开的时候,就会调用该函数。我们需要在此调用 FltCloseClientPort 函数,关闭客户端端口。
    其中,数据交互回调函数的代码为:
    NTSTATUS MessageNotifyCallback( IN PVOID PortCookie, IN PVOID InputBuffer OPTIONAL, IN ULONG InputBufferLength, OUT PVOID OutputBuffer, IN ULONG OutputBufferLength, OUT PULONG ReturnOutputBufferLength){ /* 这里要注意: 1.数据地址的对齐. 2.文档建议使用:try/except处理. 3.如果是64位的驱动要考虑32位的EXE发来的请求. */ NTSTATUS status = STATUS_SUCCESS; PAGED_CODE(); UNREFERENCED_PARAMETER(PortCookie); /* 这里输入、输出的地址均是用户空间的地址!!! */ // 显示用户传输来的数据 KdPrint(("[InputBuffer][0x%X]%s\n", InputBuffer, (PCHAR)InputBuffer)); KdPrint(("[OutputBuffer][0x%X]\n", OutputBuffer)); // 返回内核数据到用户空间 CHAR szText[] = "From Kernel Data!"; RtlCopyMemory(OutputBuffer, szText, sizeof(szText)); *ReturnOutputBufferLength = sizeof(szText); return status;}
    每当有数据交互的时候,就会调用此回调函数。我们可以从输入缓冲区中获取来自用户层程序传入的数据。然后对输出缓冲区进行设置,将内核数据输出到用户层中。这个函数和 NT 驱动程序中的 IRP_MJ_DEVICE_CONTRL 消息对应的操作函数类似。
    3.当驱动卸载的时候,要在卸载函数中调用
    // 没有这一行是停止不了驱动的,查询也是永远等待中FltCloseCommunicationPort(g_ServerPort);
    否则,停止不了驱动的,查询也是永远等待中。
    程序测试在 Win7 32 位系统下,驱动程序正常执行:

    在 Win10 64 位系统下,驱动程序正常执行:

    总结Minifilter 的通讯结构不难理解,注意和 NT 驱动程序的驱动结构进行类比理解就好。
    要注意该程序的加载,并不像 NT 驱动那样,调用加载程序来加载。WDM驱动,采用 inf 文件的安装方式,但是,一定要注意:MiniFilter生成后,一定要修改 inf中的 Instance1.Altitude = “370030”,即将注释去掉即可。因为每一个 Minifilter 驱动都必须指定一个 Altitude。每一个发组都有自己的一个 Altitude 区间,Altitude 值越高,代表在设备栈里面的位置也越高,也就是越先收到应用层发过来的IRP。
    inf 文件安装驱动方式:

    选中inf文件,鼠标右键,选择“安装”
    安装完毕后,以管理员权限打开cmd,输入“net start 服务名”启动服务
    停止服务则使用命令“net stop 服务名”即可

    同时要注意,程序在判断文件路径的时候,要使用 ExAllocatePool 申请非分页内存,不要直接使用变量,因为使用 FltGetFileNameInformation 获取的路径信息是存储在分页内存中,直接在回调函数中使用会导致蓝屏情况。
    1 留言 2019-02-15 17:40:50 奖励12点积分
  • 面试集合剑指offer——二叉树中和为某一值的路径

    题目描述输入一颗二叉树的根节点和一个整数,打印出二叉树中结点值的和为输入整数的所有路径。路径定义为从树的根结点开始往下一直到叶结点所经过的结点形成一条路径。(注意: 在返回值的list中,数组长度大的数组靠前)
    思路简述一个链表记录路径,一个存放这个链表的链表记录最终的结果。

    首先将根节点放入链表,target减去这个根节点判断是否target同时是叶子节点,如果是就将当前的链表放在结果链表里如果不是,就递归去访问左右子节点。无论是找到没有,都要回退一步
    代码实现import java.util.ArrayList;public class Solution {private ArrayList<ArrayList<Integer>> listAll = new ArrayList<ArrayList<Integer>>(); private ArrayList<Integer> list =new ArrayList<Integer>(); private ArrayList<ArrayList<Integer>> resultList = new ArrayList<ArrayList<Integer>>(); public ArrayList<ArrayList<Integer>> FindPath(TreeNode root,int target) { if(root == null) return resultList; list.add(root.val); target = target - root.val; if(target == 0 && root.left == null && root.right == null){ resultList.add(new ArrayList<Integer>(list)); } else { FindPath(root.left, target); FindPath(root.right, target); } // 在返回父节点之前,在路径上删除该结点 list.remove(list.size()-1); return resultList; }}
    0 留言 2019-02-15 13:59:22 奖励5点积分
  • 面试集合剑指offer——数据流中位数

    题目描述如何得到一个数据流中的中位数?如果从数据流中读出奇数个数值,那么中位数就是所有数值排序之后位于中间的数值。如果从数据流中读出偶数个数值,那么中位数就是所有数值排序之后中间两个数的平均值。(要求使用二叉树的思想)
    思路简述想到二叉树中和顺序有关的就是大小根堆和大小顶堆,我们要做的就是想出如何和题目结合在一起。
    大顶堆原理:根结点(亦称为堆顶)的关键字是堆里所有结点关键字中最大者,称为大顶堆。
    大根堆要求根节点的关键字既大于或等于左子树的关键字值,又大于或等于右子树的关键字值。
    小顶堆原理:根结点(亦称为堆顶)的关键字是堆里所有结点关键字中最小者,称为小顶堆。
    小根堆要求根节点的关键字既小于或等于左子树的关键字值,又小于或等于右子树的关键字值。
    终上所述,我们不需要考虑除了中位数之外其他数的顺序,所以,选用大小顶堆就可以了。创建优先级队列维护大顶堆和小顶堆两个堆,并且小顶堆的值都大于大顶堆的值。比如6,1,3,0,9,8,7,2则较小的部分大根堆是0,1,2,3 较大的部分小根堆是6,7,8,9。
    具体步骤:
    保证两个堆的尺寸差距最大为1,采用奇数个时候插到大顶堆,偶数个插到小顶堆。当数据总数为偶数时,新加入的元素,应当进入小顶堆(注意不是直接进入小顶堆,而是经大顶堆筛选后取大顶堆中最大元素进入小顶堆),要保证小顶堆里面所有数都比大顶堆的大。当数据为奇数个时,按照相应的调整进入大顶堆。如果个数为奇数个,则大顶堆堆顶为中位数,否则就是两个堆顶除以2.比如新加入三个,那么第一个在大,第二个在小,第三个可能在大。所以就是大顶堆的堆顶。
    代码实现import java.util.Comparator;import java.util.PriorityQueue;public class Shujuliumedian { int count = 0; PriorityQueue<Integer> minheap = new PriorityQueue<>(); PriorityQueue<Integer> maxheap = new PriorityQueue<>(11, new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { // TODO Auto-generated method stub return o2.compareTo(o1);//o2大于o1返回1 ,否则返回-1 } }); public void Insert(Integer num) { count++; if (count % 2 ==0) {//偶数进入小顶堆,这个其实无所谓,定了一个平均分配的规则 //保证进入小顶堆的元素要比大顶堆最大的大,所以如果小调整 if (!maxheap.isEmpty() && num < maxheap.peek()) { maxheap.offer(num); num = maxheap.poll(); } minheap.offer(num); } else {//奇数进入大顶堆 if (!minheap.isEmpty() && num > minheap.peek()) { minheap.offer(num); num = minheap.poll(); } maxheap.offer(num); } } public Double GetMedian() { double median = 0; if (count % 2 ==1) { median = maxheap.peek(); } else median = (minheap.peek()+maxheap.peek())/2.0; return median; }}
    1 留言 2019-02-14 13:52:38
  • NT驱动程序与用户层程序基于事件EVENT实现同步通信

    背景之前我们在《NT驱动程序与用户层程序通信》这篇文章中讲了,用户层程序使用 DeviceIoControl 将 IOCTL 控制码、输入缓冲区、输出缓冲区传入到内核;内核响应 IRP_MJ_DEVICE_CONTRL 消息,并从 IRP 中获取传入的 IOCTL 控制码、输入缓冲区、输出缓冲区,以此实现数据的交互。
    但是,当内核层想主动传递数据到用户层,用户层又怎样才能知道呢?因为只有用户层知道内核层有数据输出的时候,它才会调用 DeviceIoControl 函数去获取数据。所以,本文要介绍的就是基于事件 EVENT 实现的同步框架,可以解决这个的问题。现在,我就把实现思路和原理整理成文档,分享给大家。
    函数介绍CreateEvent 函数
    CreateEvent是一个Windows API函数。它用来创建或打开一个命名的或无名的事件对象。如果想为对象指定一个访问掩码,应当使用CreateEventEx函数。
    函数声明
    HANDLECreateEvent( LPSECURITY_ATTRIBUTESlpEventAttributes, // 安全属性 BOOLbManualReset, // 复位方式 BOOLbInitialState, // 初始状态 LPCTSTRlpName // 对象名称);
    参数

    lpEventAttributes[in]一个指向SECURITY_ATTRIBUTES结构的指针,确定返回的句柄是否可被子进程继承。如果lpEventAttributes是NULL,此句柄不能被继承。bManualReset[in]指定将事件对象创建成手动复原还是自动复原。如果是TRUE,那么必须用ResetEvent函数来手工将事件的状态复原到无信号状态。如果设置为FALSE,当一个等待线程被释放以后,系统将会自动将事件状态复原为无信号状态。bInitialState[in]指定事件对象的初始状态。如果为TRUE,初始状态为有信号状态;否则为无信号状态。lpName[in]指定事件的对象的名称,是一个以0结束的字符串指针。名称的字符格式限定在MAX_PATH之内。名字是对大小写敏感的。如果lpName为NULL,将创建一个无名的事件对象。
    返回值

    如果函数调用成功,函数返回事件对象的句柄。如果函数失败,函数返回值为NULL,如果需要获得详细的错误信息,需要调用GetLastError。

    ObReferenceObjectByHandle 函数
    提供对象句柄访问许可。如果访问被允许,返回相应的对象体的指针。
    函数声明
    NTSTATUS ObReferenceObjectByHandle( _In_ HANDLE Handle, _In_ ACCESS_MASK DesiredAccess, _In_opt_ POBJECT_TYPE ObjectType, _In_ KPROCESSOR_MODE AccessMode, _Out_ PVOID *Object, _In_opt_ POBJECT_HANDLE_INFORMATION HandleInformation);
    参数

    Handle [in]为一个对象指定一个打开的句柄。
    DesiredAccess [in]指定访问对象的类型。其中,EVENT_MODIFY_STATE 表示允许使用 KeSetEvent 和 KeResetEvent 函数。
    ObjectType [in, optional]表明指向对象是什么类型的。其中,*ExEventObjectType 表示对象指针类型为 PKEVENT。
    AccessMode [in]访问模式分UserMode 和KernelMode。其中,KernelMode 表示内核模式。
    Object [out]指向映射句柄对象的指针。
    HandleInformation [out, optional]
    驱动程序设置为 NULL。

    返回值

    成功,则返回 STATUS_SUCCESS;否则,返回其它 NTSTATUS 错误码。

    KeSetEvent 函数
    如果事件尚未发出信号,则 KeSetEvent 函数将事件对象设置为信号状态,并返回事件对象的先前状态。
    函数声明
    LONG KeSetEvent( _Inout_ PRKEVENT Event, _In_ KPRIORITY Increment, _In_ BOOLEAN Wait);参数

    Event[in,out]指向调用者为其提供存储的初始化事件对象的指针。Increment[in]如果设置事件导致等待满足,则指定要应用的优先级增量。其中,IO_NO_INCREMENT 表示不增加优先级。Wait指定是否通过调用 KeWaitXxx 函数之一来立即跟踪对KeSetEvent的调用。 如果为TRUE,则KeSetEvent调用之后必须调用KeWaitForMultipleObjects,KeWaitForMutexObject或KeWaitForSingleObject。 有关详细信息,请参阅以下备注部分。
    返回值

    如果事件对象的先前状态发出信号,则返回非零值。

    实现原理我们通过事件 EVENT 实现用户层与内核层的同步操作,具体的实现原理如下:

    首先,我们在用户层程序中调用 CreateEvent 函数创建事件 EVENT 并获取事件 EVENT 的句柄。事件初始状态为无信号,而且自动复原。
    然后,用户层程序调用 DeviceIoControl 函数将上述创建的 EVENT 句柄传递到内核层驱动程序中,并调用 WaitForSingleObject 函数等待事件 EVENT 的响应。直到事件 EVENT 响应后,程序才会进行下一步操作。
    接着,内核层程序就通过 IRP_MJ_DEVICE_CONTROL 消息响应函数获取从用户层传入的事件 EVENT 的句柄。调用 ObReferenceObjectByHandle 内核函数获取内核事件 EVENT 对象。
    然后,内核驱动程序可以调用 PsCreateSystemThread 创建多线程,继续执行操作。要想通知用户层程序进行下一步操作,只需调用 KeSetEvent 内核函数,将事件 EVENT 对象的状态设置为信号状态。那么,用户层程序中的事件 EVENT 就是一个有信号状态,WaitForSingleObject 函数就不会阻塞,而是继续往下执行。这样,就可以成功从内核层通知到用户层进行操作了。
    最后,用户层调用 CloseHandle 关闭事件 EVENT 句柄;内核层调用 ObDereferenceObject 释放内核事件 EVENT 对象。

    这个框架的核心原理就是,将用户层的事件句柄传入内核层的驱动程序中,并有内核驱动程序设置事件对象的信号状态,以此触发用户层的响应。
    编码实现用户层代码int _tmain(int argc, _TCHAR* argv[]){ HANDLE hEvent = NULL; HANDLE hDevice = NULL; char szOutput[MAX_PATH] = { 0 }; DWORD dwOutput = MAX_PATH; DWORD dwRet = 0; BOOL bRet = FALSE; // 创建事件, 设置自动复位,初始状态为无信号 hEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL); if (NULL == hEvent) { printf("CreateEvent Error[%d]\n", ::GetLastError()); } // 打开设备 hDevice = ::CreateFile(SYM_NAME, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_ARCHIVE, NULL); if (INVALID_HANDLE_VALUE == hDevice) { printf("CreateFile Error[%d]\n", ::GetLastError()); } // 数据交互, 向内核层中传入事件句柄 bRet = ::DeviceIoControl(hDevice, IOCTL_MY_TEST, &hEvent, sizeof(hEvent), szOutput, dwOutput, &dwRet, NULL); if (FALSE == bRet) { printf("DeviceIoControl Error[%d]\n", ::GetLastError()); } // 一直等待事件的响应 ::WaitForSingleObject(hEvent, INFINITE); // 数据交互, 从内核层中获取数据 bRet = ::DeviceIoControl(hDevice, IOCTL_MY_OUTPUT, NULL, 0, szOutput, dwOutput, &dwRet, NULL); if (FALSE == bRet) { printf("DeviceIoControl Error[%d]\n", ::GetLastError()); } // 显示 printf("[From Kernel Output]%s\n", szOutput); // 关闭设备句柄 ::CloseHandle(hEvent); ::CloseHandle(hDevice); system("pause"); return 0;}
    内核层代码IRP_MJ_DEVICECONTROL 消息处理函数// IRP_MJ_DEVICE_CONTROL 消息处理函数NTSTATUS DriverControlHandle(PDEVICE_OBJECT pDevObj, PIRP pIrp){ NTSTATUS status = STATUS_SUCCESS; // 获取当前 IRP 栈空间数据 PIO_STACK_LOCATION pIoStackLocation = IoGetCurrentIrpStackLocation(pIrp); // 获取输入/输出缓冲区 PVOID pBuffer = pIrp->AssociatedIrp.SystemBuffer; // 获取输入缓冲区数据长度 ULONG ulInputLength = pIoStackLocation->Parameters.DeviceIoControl.InputBufferLength; // 获取输出缓冲区数据长度 ULONG ulOutputLength = pIoStackLocation->Parameters.DeviceIoControl.OutputBufferLength; // 实际输出数据长度 ULONG ulInfo = 0; // 获取控制码 ULONG ulControlCode = pIoStackLocation->Parameters.DeviceIoControl.IoControlCode; // 根据操作码分别进行操作 switch (ulControlCode) { case IOCTL_MY_TEST: { // 获取传入的事件句柄 HANDLE hUserEvent = *(HANDLE *)pBuffer; // 处理类型32位、64位下类型不匹配的情况 if (4 == ulInputLength) { hUserEvent = (HANDLE)((SIZE_T)hUserEvent & 0x0FFFFFFFF); } // 根据事件句柄获取内核事件对象 status = ObReferenceObjectByHandle(hUserEvent, EVENT_MODIFY_STATE, *ExEventObjectType, KernelMode, (PVOID)(&g_pKernelEvent), NULL); if (!NT_SUCCESS(status)) { DbgPrint("ObReferenceObjectByHandle Error[0x%X]\n", status); g_pKernelEvent = NULL; break; } // 创建多线程, 执行操作, 执行完毕后, 发送事件通知用户层 HANDLE hThread = NULL; PsCreateSystemThread(&hThread, 0, NULL, NtCurrentProcess(), NULL, ThreadProc, g_pKernelEvent); break; } case IOCTL_MY_OUTPUT: { RtlCopyMemory(pBuffer, g_szOutputBuffer, 50); ulInfo = 50; break; } default: break; } pIrp->IoStatus.Status = status; pIrp->IoStatus.Information = ulInfo; IoCompleteRequest(pIrp, IO_NO_INCREMENT); return status;}
    多线程处理函数// 多线程处理函数VOID ThreadProc(PVOID StartContext){ DbgPrint("Enter ThreadProc\n"); // 获取内核对象 PKEVENT pKernelEvent = (PKEVENT)StartContext; // 设置输出缓冲区 RtlCopyMemory(g_szOutputBuffer, "I am DemonGan From Kernel Event.", (1 + strlen("I am DemonGan From Kernel Event."))); // 发送事件, 将事件对象设置为信号状态 if (NULL != pKernelEvent) { KeSetEvent(pKernelEvent, IO_NO_INCREMENT, FALSE); } // 释放事件对象 ObDereferenceObject(pKernelEvent); pKernelEvent = NULL; DbgPrint("Leave ThreadProc\n");}
    程序测试在 Win7 32 位系统下,驱动程序正常执行:

    在 Win10 64 位系统下,驱动程序正常执行:

    总结这个框架理解起来不是很复杂,关键是理解事件 EVENT 的同步处理,实现操作的先后顺序。
    1 留言 2019-02-14 13:41:31 奖励8点积分
  • 众源计划:上传资源,赚取现金红包啦 精华

    众源计划“WRITE-BUG技术共享平台”是一个专注于校园计算机技术交流共享的平台,面向的主要目标群体是我们计算机相关专业的大学生。在平台上,大家既可以交流学校课内学习的心得体会,也可以分享自己课外积累的技术经验。
    为了充实平台的资源库,更好地服务于各位同学,平台决定推出“众源计划”,有偿征集同学们自己计算机专业的作业、课程设计或是毕业设计等资源。“众源计划”的主要目的是创建一个具有一定规模的“技术资源库”,资源库里的每一份资源,都必须有详细的开发文档和可编译的源代码。
    作业、课程设计或是毕业设计等资源是同学们自己辛苦付出的成果,也是自己技术进步的见证。这部分资源通常都有详细的开发文档和完整的程序源代码,能够帮助其他初学者更好地消化和吸收将要学习的技术,降低学习门槛。所以,平台决定有偿征集这些资源。
    具体要求活动起止日期
    2018.12.20 - 2019.03.20
    活动对象
    在校或者已毕业的计算机相关专业大学生,院校不限
    奖励方式
    资源上传并审核通过后,根据资源质量,奖励每贴 10 - 100 元人民币(仅支持支付宝支付)
    上传流程
    会员登录自己的账号上传资源
    资源上传后,管理员会在 24 小时之内审核资源
    审核通过后,管理员会主动站内私信获取支付宝帐号,并立即发放奖金至所提供的支付宝账户

    审核重点
    重点审核资源是否具有详细的文档和完整的源代码
    审查资源是否原创,切勿重复提交

    资源要求“众源计划”仅对两类资源进行有偿征集,分别是“课内资源”和“课外资源”,各类资源具体要求如下所示。

    课内资源

    内容范围:计算机相关专业课内的毕业设计、课程设计、小学期、大作业等课程内开发的程序,程序包括游戏、PC程序、APP、网站或者其他软件形式
    内容要求:资源必须要包括完整的程序源代码和详细的开发文档或报告
    具体“课内资源”征集程序列表见附录一

    课外资源

    内容范围:计算机相关专业的课外自己主导研究游戏、项目、竞赛、个人研究等,区别于课程设计和毕业设计等课内资源
    内容要求:资源必须要包括完整的程序源代码和详细的开发文档或报告
    具体“课外资源”征集程序列表见附录二


    附录一注意:“众源计划”的题目范围包括且不限于以下题目

    汇编语言课程设计题目列表

    屏幕保护程序分类统计字符个数计算机钢琴程序字符图形程序音乐盒程序电子闹钟程序俄罗斯方块打字游戏图形变换程序吃豆子程序其他
    C语言课程设计题目列表

    学生成绩管理系统图书信息管理系统设计销售管理管理系统飞机订票管理系统教师评价系统学校运动会管理系统文本文件加密技术英语字典电话簿管理系统流星雨的实现其他
    C++语言课程设计题目列表

    学生学籍管理系统高校人员信息管理系统学生成绩管理系统车辆管理系统职工工作量统计系统学生考勤管理系统单项选择题标准化考试系统图书管理系统超市商品管理系统模拟ATM机存取款管理系统其他
    JAVA语言课程设计题目列表

    简单投票管理系统数学练习题目自动生成系统华容道小游戏电子英汉词典加密与解密标准化考试系统排球比赛计分系统学籍管理系统绘图系统图书信息管理系统其他
    C#语言课程设计题目列表

    学生信息管理系统学生综合测评系统图书管理系统学校运动会管理系统个人通讯录管理系统教师工资管理系统教师工作量管理系统趣味小游戏物资库存管理系统图形图像处理系统其他
    JSP语言课程设计题目列表

    微博系统基于web的学生信息管理系统在线计算机等级考试报名系统在线问卷调查系统网上销售系统论坛系统图书借阅管理系统网上购物系统工资管理系统酒店管理系统其他
    数据结构与算法课程设计题目列表

    设计哈希表实现电话号码查询系统电报压缩/解压缩系统电费核算系统机房计费管理系统公交线路查询系统用二叉平衡树实现图书管理系统运动会赛事安排动态表达式求值用线性结构实现学生成绩管理求解迷宫问题其他
    编译原理课程设计题目列表

    First集和Follow集生成算法模拟LL(1)分析过程模拟FirstVT集和LastVT集生成算法模拟算符优先分析表生成模拟算符优先分析过程模拟LR分析过程模拟PL/0语言的词法分析程序C语言的预处理程序自动机的状态转换图表示数组越界检查工具其他
    操作系统课程设计题目列表

    动态分区分配方式的模拟进程调度模拟算法请求调页存储管理方式的模拟P、V操作及进程同步的实现银行家算法SPOOLING假脱机输出的模拟程序文件系统设计动态不等长存储资源分配算法磁盘调度算法处理机调度算法模拟其他
    数据库课程设计题目列表

    高校学籍管理系统在线投稿审稿管理系统产品销售管理系统高校人力资源管理系统高校课程管理系统酒店客房管理系统报刊订阅管理系统医药销售管理系统学生学籍管理系统餐饮管理系统其他
    计算机网络课程设计题目列表

    TCP通信功能实现网络游戏的开发基于UDP协议网上聊天程序Ping 程序的实现数据包的捕获与分析FTP客户端设计包过滤防火墙的设计与实现简单的端口扫描器简单Web服务器的设计与实现HTTP客户端的设计与实现其他
    软件工程课程设计题目列表

    学校教材订购系统网上选课管理系统简易办公系统图书馆管理系统校园交流论坛网站超市收银系统ATM柜员机模拟程序企业办公自动化管理系统学生成绩管理系统进销存管理系统其他
    VC++程序设计课程设计题目列表

    模拟时钟程序单向链表的操作演示程序电影院售票系统俄罗斯方块五子棋24点游戏背单词软件的设计与实现酒店管理系统餐厅就餐管理系统吹泡泡游戏其他
    其他课程设计

    PHP语言课程设计PYTHON语言课程设计计算机图形学课程设计机器学习课程设计密码学课程设计其他

    附录二注意:“众源计划”的题目范围包括且不限于以下题目

    人脸识别系统车牌识别系统旅游自助APP疲劳驾驶识别检测系统考试管理系统WINDOWS驱动级安全防御系统WINDOWS平台逆向调试器坦克大战小游戏情感分析系统人机博弈的国际象棋游戏其他
    最终解释权归 WRITE-BUG技术共享平台 所有
    4 留言 2018-12-20 10:09:45 奖励100点积分
  • NT驱动程序与用户层程序通信

    背景当我们开发程序的时候,通常将一些需要权限的特殊功能放到内核层去执行。我们看到的界面程序是运行在用户层上,通过界面程序获取用户的操作信息,并传递给内核程序,在内核层中进行处理。
    本文就是要实现用户层和内核层数据通信的程序,向大家讲解用户层程序和 NT 驱动程序之间的数据交互的实现过程。现在,我就把程序实现过程和原理整理成文档,分享给大家。
    实现过程要实现用户层程序和内核层 NT 驱动程序的数据交互,我们需要分别对我们的用户层程序和驱动程序进行编码实现。首先,我们先来对用户层的程序实现过程进行分析,然后,在对内核层的驱动程序进行分析。
    其中,内核程序和用户程序的数据交互,有对于 IOCTL 控制码有 4 中操作类型:

    METHOD_BUFFERED:使用缓冲区方式操作
    METHOD_IN_DIRECT:使用直接写方式操作
    METHOD_OUT_DIRECT:使用直接读方式操作
    METHOD_NEITHER:使用其它方式操作

    本文讲解的程序,使用的是 METHOD_BUFFERED 缓冲区操作方式。
    用户层程序实现过程对于用户层来说,实现起来比较简单不是很复杂。具体操作是:

    首先,调用 CreateFile 打开驱动设备。驱动设备的名称是我们在写驱动程序的时候,设置的设备符号连接名称,改名称要以“\\.\”开头,在程序中要注意转移字符的问题,如:“\\\\.\\MyDevName”。这样,我们便可以获取驱动设备的句柄。
    然后,我们可以调用 DeviceIoDevice 函数,根据驱动设备句柄,并设置输入缓冲区以及输出缓冲区,向驱动程序传入数据以及获取驱动程序输出的数据,以此实现用户程序和驱动程序的数据交互。
    最后,当我们不再使用设备句柄的时候,便可以调用 CloseHandle 函数关闭设备句柄。

    那么,用户层的实现代码如下所示:
    int _tmain(int argc, _TCHAR* argv[]){ HANDLE hDevice = NULL; char szInput[] = "Who are U?"; DWORD dwInput = sizeof(szInput); char szOutput[MAX_PATH] = {0}; DWORD dwOutput = MAX_PATH; DWORD dwRet = 0; BOOL bRet = FALSE; // 打开设备 hDevice = ::CreateFile(SYM_NAME, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_ARCHIVE, NULL); if (INVALID_HANDLE_VALUE == hDevice) { printf("CreateFile Error[%d]\n", ::GetLastError()); } // 数据交互 bRet = ::DeviceIoControl(hDevice, IOCTL_MY_TEST, szInput, dwInput, szOutput, dwOutput, &dwRet, NULL); if (FALSE == bRet) { printf("DeviceIoControl Error[%d]\n", ::GetLastError()); } // 关闭设备句柄 ::CloseHandle(hDevice); // 显示 printf("[From Kernel]%s\n", szOutput); system("pause"); return 0;}
    内核层程序实现过程对于,内核层驱动程序来说,操作就要比用户层复杂些。具体的操作是

    首先,我们需要在驱动程序的入口点函数中,设置驱动程序要处理 IRP 的消息。处理的消息有:DriverUnload、IRP_MJ_CREATE、IRP_MJ_CLOSE 以及 IRP_MJ_DEVICE_CONTROL 消息。其中,DriverUnload 表示驱动卸载的消息处理函数;IRP_MJ_CREATE 表示打开驱动设备的消息处理函数;IRP_MJ_CLOSE 表示关闭驱动设备的消息处理函数;IRP_MJ_DEVICE_CONTROL 表示获取控制码的消息处理函数。
    然后,为驱动程序创建驱动设备,并为驱动设备对象创建一个符号连接名称,方便用户层程序的调用。主要使用 IoCreateDevice 内核函数以及 IoCreateSymbolicLink 内核函数实现。
    最后,当驱动程序卸载的时候,我们也需要调用 IoDeleteSymbolicLink 函数删除驱动设备符号连接名称;调用 IoDeleteDevice 函数删除设备对象。

    其中,在 IRP_MJ_DEVICE_CONTROL 消息处理函数中,我们可以通过调用内核函数 IoGetCurrentIrpStackLocation 来获取 I/O 请求包 pIrp 的数据,从中获取控制码、输入缓冲区长度等信息 。直接从 I/O 请求包 pIrp 中,可以获取输入输出缓冲区的地址。在 METHOD_BUFFERED 缓冲区操作方式中,输入、输出缓冲区地址使用的都是同一个地址,同一块缓冲区。
    这样,我们就可以根据不同的控制码,从缓冲区中获取不同的数据,并返回不同的内核数据到输出缓冲区中,以此实现用户数据和内核数据的交互。
    那么,DriverEntry 函数代码如下所示:
    NTSTATUS DriverEntry(PDRIVER_OBJECT pDriverObject, PUNICODE_STRING pRegPath){ DbgPrint("Enter DriverEntry\n"); NTSTATUS status = STATUS_SUCCESS; pDriverObject->DriverUnload = DriverUnload; for (ULONG i = 0; i < IRP_MJ_MAXIMUM_FUNCTION; i++) { pDriverObject->MajorFunction[i] = DriverDefaultHandle; } pDriverObject->MajorFunction[IRP_MJ_DEVICE_CONTROL] = DriverControlHandle; // 创建设备 status = CreateDevice(pDriverObject); DbgPrint("Leave DriverEntry\n"); return status;}
    创建设备的代码如下:
    // 创建设备NTSTATUS CreateDevice(PDRIVER_OBJECT pDriverObject){ NTSTATUS status = STATUS_SUCCESS; UNICODE_STRING ustrDevName, ustrSymName; PDEVICE_OBJECT pDevObj = NULL; // 初始化名称 RtlInitUnicodeString(&ustrDevName, DEV_NAME); RtlInitUnicodeString(&ustrSymName, SYM_NAME); // 创建设备 status = IoCreateDevice(pDriverObject, 0, &ustrDevName, FILE_DEVICE_UNKNOWN, 0, FALSE, &pDevObj); if (!NT_SUCCESS(status)) { DbgPrint("IoCreateDevice Error[0x%X]\n", status); return status; } // 创建设备链接 status = IoCreateSymbolicLink(&ustrSymName, &ustrDevName); if (!NT_SUCCESS(status)) { DbgPrint("IoCreateSymbolicLink Error[0x%X]\n", status); return status; } return status;}
    IRP_MJ_DEVICE_CONTROL消息处理函数的代码如下:
    // IRP_MJ_DEVICE_CONTROL 消息处理函数NTSTATUS DriverControlHandle(PDEVICE_OBJECT pDevObj, PIRP pIrp){ NTSTATUS status = STATUS_SUCCESS; // 获取当前 IRP 栈空间数据 PIO_STACK_LOCATION pIoStackLocation = IoGetCurrentIrpStackLocation(pIrp); // 获取输入/输出缓冲区 PVOID pBuffer = pIrp->AssociatedIrp.SystemBuffer; // 获取输入缓冲区数据长度 ULONG ulInputLength = pIoStackLocation->Parameters.DeviceIoControl.InputBufferLength; // 获取输出缓冲区数据长度 ULONG ulOutputLength = pIoStackLocation->Parameters.DeviceIoControl.OutputBufferLength; // 实际输出数据长度 ULONG ulInfo = 0; // 获取控制码 ULONG ulControlCode = pIoStackLocation->Parameters.DeviceIoControl.IoControlCode; // 根据操作码分别进行操作 switch (ulControlCode) { case IOCTL_MY_TEST: { // 显示输入缓冲区内容 DbgPrint("[From User]%s\n", (PCHAR)pBuffer); // 向输出缓冲区返回内容 ulInfo = 14; RtlCopyMemory(pBuffer, "I am DemonGan", ulInfo); break; } default: break; } pIrp->IoStatus.Status = status; pIrp->IoStatus.Information = ulInfo; IoCompleteRequest(pIrp, IO_NO_INCREMENT); return status;}
    程序测试在 Win7 32 位系统下,驱动程序正常执行:

    在 Win10 64 位系统下,驱动程序正常执行:

    总结要特别注意一点就是,在驱动程序的入口点函数 DriverEntry 中,我们一定要对 IRP 消息 IRP_MJ_CREATE 和 IRP_MJ_CLOSE 消息进行处理,否则我们在用户层通过 CreateFile 和 CloseHandle 函数打开驱动设备并获取设备句柄和关闭设备句柄的时候会出错,不能正确执行程序。这也是很多初学者容易出错的地方,所以,大家一定要注意检查。
    2 留言 2019-02-12 18:57:58 奖励10点积分
  • 面试集合剑指offer——按之字形顺序打印二叉树

    题目描述请实现一个函数按照之字形打印二叉树,即第一行按照从左到右的顺序打印,第二层按照从右至左的顺序打印,第三行按照从左到右的顺序打印,其他行以此类推。
    思路简述利用两个栈的辅助空间分别存储奇数偶数层的节点,然后打印输出。或使用链表的辅助空间来实现,利用链表的反向迭代实现逆序输出。
    代码实现import java.util.ArrayList;import java.util.Stack;public class Solution { public ArrayList<ArrayList<Integer> > Print(TreeNode pRoot) { ArrayList<ArrayList<Integer>> res = new ArrayList<>(); if(pRoot == null) return res; Stack<TreeNode> s1 = new Stack<>(); Stack<TreeNode> s2 = new Stack<>(); s1.push(pRoot); int level = 1; while (!s1.empty()||!s2.empty()) { if (level %2 != 0) { ArrayList<Integer> list = new ArrayList<>(); while (!s1.empty()) { TreeNode node = s1.pop(); if (node!= null) { list.add(node.val); s2.push(node.left);//因为偶数层,先右后左,所以要先放左子树,栈 s2.push(node.right); } } if (!list.isEmpty()) { res.add(list); level++; } } else { ArrayList<Integer> list = new ArrayList<>(); while (!s2.empty()) { TreeNode node = s2.pop(); if (node!= null) { list.add(node.val); s1.push(node.right); s1.push(node.left); } } if (!list.isEmpty()) { res.add(list); level++; } } } return res; }}
    1 留言 2019-02-13 14:03:38 奖励5点积分
  • 面试集合剑指offer——二叉树打印成多行

    题目描述从上到下按层打印二叉树,同一层结点从左至右输出。每一层输出一行。
    思路简述使用两个队列(ArrayList模拟队列)实现的,一个队列一行分别进行模拟队列出队列。当第一个队列出来后,下一层的数据也就按照从左到右的顺序放入到第二个队列。先进先出,出来的顺序也就是从左到右。
    代码实现import java.util.ArrayList;import java.util.Stack;import java.util.Queue;public class Solution { ArrayList<ArrayList<Integer> > Print(TreeNode pRoot) { //s1存奇数层节点 ArrayList<TreeNode> list1 = new ArrayList<TreeNode>(); list1.add(pRoot); ArrayList<TreeNode> list2 = new ArrayList<TreeNode>(); ArrayList<ArrayList<Integer>> lists = new ArrayList<ArrayList<Integer>>(); while (!list1.isEmpty()|| !list2.isEmpty()) { if (!list1.isEmpty()) { ArrayList<Integer> temp = new ArrayList<Integer>(); while (!list1.isEmpty()) { TreeNode node = list1.remove(0); if(node != null) { temp.add(node.val); list2.add(node.left); list2.add(node.right); } } if (!temp.isEmpty()) { lists.add(temp); } } else { ArrayList<Integer> temp = new ArrayList<Integer>(); while (!list2.isEmpty()) { TreeNode node = list2.remove(0); if(node != null) { temp.add(node.val); list1.add(node.left); list1.add(node.right); } } if (!temp.isEmpty()) { lists.add(temp); } } } return lists; }}
    1 留言 2019-02-12 13:25:34 奖励5点积分
  • CR0方式读写内存与MDL方式读写内存

    背景通常,我们在内核中修改内存的时候,都是通过修改 CR0 寄存器,关闭内存写保护属性,然后再写入内存的方式来修改内存。我个人不喜欢这种方式,因为总感觉我们使用没有线程接口函数的方法,总感觉不太稳定,而且,在 64 位程序下,CR0 方式不再适用了。
    所以,我强烈推荐在内核下使用 MDL 方式来修改内存,在 32 位内核和 64 位内核下同样有效。
    实现过程在 32 位内核下,大都通过清除 CR0 寄存器的 Write Protect 位来关闭内存写保护属性,或者通过设置 CR0 寄存器的 Write Protect 位来启动内存写保护属性,代码如下:
    // 关闭写保护属性__asm{ cli push eax mov eax, cr0 and eax, not 0x10000 mov cr0, eax pop eax}
    // 开启写保护属性__asm { push eax mov eax, cr0 or eax, 0x10000 mov cr0, eax pop eax sti}
    这种方式,对于 64 位内核来说,已经不再适用。接下来,我们就开始介绍 MDL 方式来修改内存。
    内存描述符列表 (MDL) 是一个系统定义的结构,通过一系列物理地址描述缓冲区。MDL的全称是 Memory Descriptor List,即内存描述符表。可以通过MDL描述一块内存区域,在MDL中包含了该内存区域的起始地址、拥有者进程、字节数量、标记等信息。
    MDL 是用来建立一块虚拟地址空间与物理页面之间的映射。对这句话的理解是使用 MDL 来修改内核内存的关键。当我们要对一块内核内存进行修改的时候,我们先为这块内存创建 MDL,那么就会建立一块新的虚拟内存空间,与将要修改内存对应的物理空间相映射。也就是说,同一块物理空间,映射了两块不同的虚拟内存地址。我们可以通过这两个虚拟内存地址,来操作这块物理内存,这便是 MDL 修改内存的实现思路。
    那么,使用 MDL 方式修改内存具体的实现流程如下:

    首先,给定缓冲区的起始地址和长度,调用 MmCreateMdl 函数分配一个足够大的 MDL 结构来映射给定的缓冲区
    然后,调用 MmBuildMdlForNonPagedPool 函数来更新 MDL 对物理内存的描述
    最后,调用 MmMapLockedPages 函数将 MDL 中描述的物理页面映射到虚拟内存中,并返回映射的虚拟内存地址

    这样,我们就可以通过新映射的虚拟内存地址,操作同一块物理页面了,以此实现修改指定内存的数据。
    BOOLEAN MDLWriteMemory(PVOID pBaseAddress, PVOID pWriteData, SIZE_T writeDataSize){ PMDL pMdl = NULL; PVOID pNewAddress = NULL; // 创建 MDL pMdl = MmCreateMdl(NULL, pBaseAddress, writeDataSize); if (NULL == pMdl) { return FALSE; } // 更新 MDL 对物理内存的描述 MmBuildMdlForNonPagedPool(pMdl); // 映射到虚拟内存中 pNewAddress = MmMapLockedPages(pMdl, KernelMode); if (NULL == pNewAddress) { IoFreeMdl(pMdl); } // 写入数据 RtlCopyMemory(pNewAddress, pWriteData, writeDataSize); // 释放 MmUnmapLockedPages(pNewAddress, pMdl); IoFreeMdl(pMdl); return TRUE;}
    程序测试在 Win7 32 位系统下测试,两种方式实现的驱动程序正常执行;在 Win10 64 位系统下测试,MDL方式实现的驱动程序正常执行;
    总结对于修改 CR0 的 Write Protect 属性位方式来关闭写保护,以此来读写内存的方式仅适用于 32 位程序,对于 64 位程序不适用。建议使用 MDL 方式,创建一块新的虚拟内存,映射同一块物理内存的方式来修改内存,这种实现方式较为稳定,既适用 32 位程序,也适用 64 位程序。
    1 留言 2019-02-11 21:43:00 奖励6点积分
  • 面试集合剑指offer——从上往下打印二叉树

    题目描述从上往下打印出二叉树的每个节点,同层节点从左至右打印。
    思路简述
    根节点放到队列里面,队列不空,就打印队列头,打印这个节点,马上把这个节点的左右子节点放到队列中。再要访问一个节点,把这个节点的左右放入,此时队头是同层的,队尾是打印出来的左右。
    代码实现import java.util.ArrayList;import java.util.LinkedList;public class Solution { public ArrayList<Integer> PrintFromTopToBottom(TreeNode root) { ArrayList<Integer> layerList = new ArrayList<Integer>(); if (root == null) return layerList; LinkedList<TreeNode> queue = new LinkedList<TreeNode>(); queue.add(root); while (!queue.isEmpty()) { TreeNode node = queue.poll(); layerList.add(node.val); if (node.left != null) queue.addLast(node.left); if (node.right != null) queue.addLast(node.right); } return layerList; }}
    0 留言 2019-02-11 14:16:21 奖励5点积分
  • 大数据9.flume

    前文链接:https://write-bug.com/article/2123.html
    Flume日志收集系统
    Apache Flume是一个分布式、可信任的弹性系统,用于高效收集、汇聚和移动 大规模日志信息从多种不同的数据源到一个集中的数据存储中心(HDFS、 HBase)
    Flume它是一个消息采集系统,什么是消息采集呢?
    消息就是说你的数据源也就是你的消息源,在这个用户他会通过一系列行为他会留下大量的行为数据或者是行为消息,那这些消息都是更接近于更原始的最原生没有任何过滤的一些有价值的信息提取,相当于是整个的一个记录序列里面,它既有价值信息又有参杂的一些过渡修饰的一些结构,需要被过滤的一些消息,那这个时候你需要把这些大量的消息从数据源开始进行一个收集,因为用户在去留下它们的日志行为的时候,其实这些行为都会被留在了或者被散落在了各个不同的服务器的一个角落,那相当于这些服务器也就是散落在不同机房不同地域的一些各个的数据节点上或者一个服务器节点上,那这个时候它这个数据是非常一个散落的状态,这个时候就需要一个服务,把这些散落的这些原始日志进行一个统一的一个收集,然后供后续的整个的流程比如什么数据过滤,数据入库,数据挖掘等等这些后续我们有待于去进一步操作的事情,所以第一步首先想办法怎么把这个最原始的数据先对接过来,那这个时候就需要用类似于Flume这样的一个消息采集系统到目前为止我们在之前的学习过程中已经完成了很多的一些重要的一些组件,那不同的组件其实有各自的特点然后每一个组件都适合不同的场景,那其实我们在之前不管学hive也好学hbase也好或者是学hdfs,那你会发现这么多个组件其实在整个的这个架构里面它们都处于一个完整项目的一个中下游,中下游就是相当于说是消费者这么一个状态,那起码你的有生产者,生产者你不可能消息一开始就生产到了HDFS上对吧?
    然后这个时候你需要通过一个中间介然后把最原始的消息采集过来然后再去传到后面不管是HDFS上还是hbase这些等等的存储上面去,这个时候相当于我们之前学的不管是HDFS,hive,hbase都是属于下游的一个角色,而且我们还学了一些集成框架,这框架有mapreduce,storm以及spark都是来解决数据计算的一些问题,然后hive,hbase主要是解决一个数据存储和结构化的问题,所以这个时候我们既然已经学了这么多处于一个从消费者的状态这么一个角色一些个组件的话,我们就要想我们这些数据的源头是哪里对不对?那很多时候我们做项目的时候这数据已经给你好现成的了,你就直接去做处理就可以了,你不需要关心这个数据源在哪里,但是你从一个完整的一个项目的宏观角度去观察,你必须要知道这个数据它的来源是在哪里是不是?所以为了保证整个的项目的完整性,保证你对整个的一个数据流的一个打通一个鼻环的一个认识,就是你的数据采集这一块也是有必要掌握的对吧?所以数据采集这一块我们通常就用这个flume方式进行一个消息采集
    数据源:

    server Log(tail、grep查看):webserver
    远程调用:http接口(url)、RPC
    网络:netcat:IP:port(生产消费)
    文件系统:目录树数据变化
    终端:Console
    文本:Text
    数据库:Exec

    你有数据采集之后那你接下来就假设把这个原始信息拿到了,那么就需要把这个数据做一个缓存,先把这个消息进行存储起来,然后存储起来之后因为你这个消息会存在着大量的无效的一些信息,你需要做一些有效字段或者有效结构化一些提取,这时候就涉及到了数据过滤环节。
    跟着这样的一个思路,从数据源开始通过一个服务把数据采集下来,采集的数据需要通过某一个存储或者是某一个缓存把它暂时的存储起来,起码这个数据先落地,落地到具体哪个位置的话这个先不用考虑,起码先把这个数据拿下来,那下来这个数据因为比较原始所以你需要对这个数据进行过滤,因为这数据存在大量的无效的数据。
    然后接下来过滤完之后你需要做一些个转换工作,比如说你这个数据是从一个非结构化的数据变成一个结构化的数据是吧?你怎么把这个数据从你的文件系统里面怎么样转换到一个像数据表格那样,字段然后记录行列之间非常清晰分布这么一个状态,这样有便于后续的一些分析,那转换后的数据就很明显了,需要把它进行一些存储,比如说把它存到HDFS上或者存在你的hbase上等,好了你把这个数据存储完之后那接下来需要做一些检索工作,就需要用于一些检索用,那检索相信你这个数据存储的话你是肯定是要后面来用的对吧?那怎么让下游用的更方便或者是更快捷?这里面就肯定涉及到了一些键索引,那你比如说像之前搞过mysql的人,然后为了让你的数据检索的更快那它自身会支持一些个索引。
    那在整个的大数据里面比如说这个HDFS,比如存储在hbase上那这个时候估计大家就会有疑问了,这个索引怎么建呢?那这个时候其实跟我们后面要学的数据挖掘部分怎么去做一个数据分析还是有一定的关联的。Hbase这块它也是支持一定的索引的。
    那不管怎么说就目的就是能够快速的检索你的数据,那检索到数据之后就开始做一些个数据分析,然后分析后的数据就是把你有用的信息怎么去挖掘出来,然后把挖掘这最后的数据进行一些服务,大概是这么几个环节(如下图所示)

    那整个的一个数据的一个走向那从上游慢慢从水游一样游到了下游,那你会发现数据采集这一块是非常非常贴近于最源头了是吧?那所以今天我们就要了解数据采集这块,那有很多人不太理解为什么你今天非要讲flume,为什么不讲kafka,那有基础的同学那在上图1中里面应该是处于哪个模块?Kafka是一个消息缓冲器对吧?那么相当于kafka是相当于上图1中的缓存器,那么我们学完数据采集再去学kafka,像这样的话你会发现会整个的思路来说会比较更顺畅一些是吧?我们先学上游再去学下游这么一个思路,那么这个过滤是需要做什么呢?这个过滤结合我们之前的内容来说,我们这个过滤如果要做你想怎么做?你可以用mapreduce用storm等去实现,我们之后会有一个案例,怎么把flume和kafka和Storm关联起来对不对?相当于整个链路就打通了。那转换这一块就好说了,不就是转成hive或者是hbase。
    从最开始的图我们就可以看出Flume在其中是一个承上启下的角色,左边流入,右边流出,而且你后面的分析,数据挖掘都是在你的统一的这套存储系统上去做的,主要是想办法怎么去让日志消息收集过来,就是这么一个作用。除了上面多种方式高效接入接出数据的特点,Flume还支持多路径流量,多管道接入流量,多管道接出流量,上下文路由等特点还可以被水平扩展。那么这几个特点是什么意思呢?
    比如说多个log服务器和一个flume和两个存储系统,好了那我这个日志就可以通过flume集群,其实这个flume只画了一个模块,其实这个flume你可以搭建一个集群的方式通过多个不同的机器来维护整套的消息采集这么一个系统,因为这个消息量还是很大的,你只要通过一台机器的话,那通常是搞不定,所以flume还是通过一系列集群来去并发的收集信息。
    然后flume的这个数据也可以进行一个多路输出,意思就是你可以把一个消息可以选择性的去存储到1还是2中,比如说你这个存储1,你这个存储1后面走的挖掘策略就不一样了。这存储2就是另外一套挖掘策略方案(如下图所示)

    而且这两个日志可能不是完全一样的,比如说这个日志1是来收集展现,这个日志2是来收集点击的,那这个策略1可能就是对展现日志进行一个处理,这个策略2对点击日志进行一个处理,那所以这个时候我们希望展现日志可以通过这么一个管道的方式能够有效的流入存储1里面去,我就不想这log1里面的日志不想流到存储2里面去,那么相当于是flume一旦发现你这个源是来自日志1,那我就可以自动把你这个数据直接放到存储1里面去,并不是说把你的数据能够复制,把同样的一个日志只要你这个下游都是对接到了flume上,那下游所有的节点都会收到同样的一份消息,那flume可以这样有区别的进行对待处理,这是一个它可以指定有效路径的方式,这个方式叫做复用机制。
    还有一个就是复制机制,就是说我不管你这个flume前面这个数据源是什么,只要是你来了一个数据源,只要是我这个存储是属于flume下游,那我所有的存储器我都会接收到同样的一份数据,比如说你这个log1的一条数据进来了,那比如说后面有两个存储,那相当于把你一份的数据我复制成两份,每一份节点我都发一份,这是一个复制机制。
    然后多管道接入流量这块也可以体现出这个问题,就相当于这两个日志有两个管道,那这两个log日志是来自不同的一个日志源级别的,这时候flume就可以通过不同的管道去对一些不同的日志源,然后多管道接出,剩下一个上下文路由,路由刚大概说过了,然后水平扩展这一块因为这个flume通常用的时候也是通过一个集群的方式去用,这个flume你可以进行一些个扩展,比如说你一些节点就是在数据采集过程中不够用了,你可以往上面加一些个节点或者资源然后共同的支撑共同的并发,相当于是可扩展性比较强把。

    那我们先从外部的整个框架入手,那最左边是一个消息的一个发生器(Data Generators),什么是消息发生器呢?就是一个日志服务器,比如说就是你们公司里面那个用户接收请求的它的一些请求信息,然后一些收集的这些服务器,就是一些webserver,那相当于日志发生器就是它已经开始陆续的产生消息了,那后面有一个橙色的一个大框对吧?,这里面就是一个整个的flume,然后flume就是从这个发生器(Data Generators)日志来进行采集,采集之后又得到了后面的一个HDFS或者是hbase存储里面去。
    好了这个时候把这个整个的flume这个黑盒的面纱解开你就可以看到这个flume,里面分成了一个Agent和一个Data Collector,Agent的意思就是一个代理模块,它是用来对消息进行接收和汇集。比如有两个log server1和log server2,那么这个Agent通常是部署到跟你同一个server同一台机器上,你这个og server1是用来不断的产生的日志消息,然后你一旦产生这个消息由这个Agent1这个消息来从你这个server上直接发送出去,那把消息发送给谁呢?就发送给一个叫collector(如下图所示)

    所以你从这图8里面就可以看出我虽然这个flume里面包含了Agent和这个collector,其实通常来说Agent和这个collector是分班部署到不同的节点上的,就是结耦。
    通常来说就是你的Agent会很多,因为你这每一个log server都有一个Agent,那你这个server通常会很多对不对?所以Agent也会很多。那这个时候你后面不是像上图8画的一样,一个Agent和一个collector一一对应并不是这样的,通常来说就是一个Agent可能会对应多个collector,就相当于是你前面有多个Agent的消息统一的被你的collector进行一个收集,一般来说server和Agent是一比一的,好了那Agent把消息发给collector,因为这两个属于不同的机器,这个时候collector会去把真正收集到的信息再去做一个存储(Storage),因为这个存储就HDFS或者是hbase,所以这一块就不需要大家开发了,那你就需要把collector怎么能够通过一个配置的形式把前面消息能够直接发送到指定的相应配置的目标路径上去就可以了,所以通过这个地方大家先了解一下你的Agent和你的collector的定位是怎样的(如下图所示)

    Agent就相当于冲到最前线的,这个collector就相当于是后方基地,然后不断去接收前线的一些消息,然后它在把这个消息怎么再往存储上再去做处理,collector也是可以多个的。
    接下来再看一下我们在去讲storm的时候,storm也类似于这么一个流程是不是?从头往后一个数据流进行一个传输是不是?然后再storm里面它的数据流也是有一定的单位的形式做传输对吧?那这个storm的单位是什么呢?tuple对吧?,那在hdfs上数据的单位是一个block对吧?在flume里面数据单位是Event,是一个事件。假如说在整个的flume里面它内部流转了这些消息都是一个事件,所以flume是用这个Event对象来做一个消息传递的格式(如下图)

    它属于内部数据传输的一个最基本单元,那你把这个事件已经打开,打开它有两个部分组成,那第一个部分就是一个Header,第二个部分就是一个Byte Payload,就是你的头和身体对吧?通常这个数据这个头部你可以有也可以没有这个可以选择的,不一定说这个header就是一定要存在的好吧?那如果说这个header要存在的时候可以理解为一个key,body你把它想象成一个value,如果你把数据有一个key有一个value,大家很容易想到的是在mapreduce里面有一个partition对吧?partition就是用来做分发消息的,也就是说这个Event有两个部分,Header和Byte Payload,这个Header是可有可无,如果是它没有这个Header只有byte Payload的时候,那么byte Payload其实就是存的是数据,那这个时候数据就开始往后流向进行传输是不是?这是一个最直接的流程,但是有的时候你需要对这个数据做一些个路由,最后一些个分发,就是说有的消息我想分发到A节点上但是我不想分发到B节点上,有的消息我想分发到B节点但是我不想分发到A节点上,那对于这种有特殊需求的情况,这个时候你就需要用到header,你必须要分配给它一个key,然后这个时候它做分发key的时候就根据你这个header里面的信息去做一个数据的一个路由,有点类似于分桶,所以相当于把这块跟我们之前学的partition结合起来可能理解起来更容易一些。
    header是key/value形式的,这个其实跟我们说的key和value不属于同一个层次,就是这个header如果你要是有这个信息,这个信息就大概长(k:v,k1: v1)这样的样子,然后你去做一个partition的时候你就根据这个key和value去做一个分发这么一个情况,所以这个时候大家就记住一点我的header就是为了做分发用的,Byte Payload就是存实际的数据内容用的。
    好了这个时候就讲了一个比较重点的东西,就是一个代理(如下图所示)

    这个Agent刚刚我们讲过了,这个flume可以拥有多个Agent,当然也可以拥有一个,然后每一个Agent就是一个进程,这个进程就相当于是在你的服务器上一直运行着然后一直监控着你的这个消息,一直监控着你这个日志的产生,一旦你有日志发生变化了,那这个进程就会把你的消息进行一个数据的收集然后往下游不断的传输。
    如果说你把这个Agent再进行打开,再进行把你的内部细节暴露出来,Agent就可以暴露出三个部分,这三个部分就是source和Channel和sink这三个,那么这三个模块有什么用呢?source就是真正对接你数据源进行输入,而Channel就是一个管道,sink就是一个输出,就是你的source把消息接收过来,然后消息会存到你的管道里面会做一个缓存,缓存我们之前学过一些对吧?这个缓存存储可以是文件形式的,相当于就是在你本地一个落地到磁盘上的那个文件对不对?还有一个就是在你内存分配出一个区域,就是这个数据在你内存中扭转的,不落地这是通过一个memory的形式,所以你的数据输入是可以存在你的文件里面也可以存到你的memory里面,那存到memory里面会更快一点,但是有一个问题就是一旦你这个agent出现了问题那你这个消息因为你存在memory里面,在消息可能会存在丢失的风险。但是为了保证你的消息的可用性可靠性通常建议把消息直接存在你的文件里面,但是这是存到你的文件里还是存到你的内存里面这是你要通过一个配置去配置的。

    source :输入-》对接各种数据源
    channel:缓存管道(file,memory)
    sink :输出-》对接各种存储

    然后输入就是对接各个数据源,输出就是对接各种存储,所以相当于是每一个组件都是各司其职,然后彼此之间能够协同工作,然后让消息能够有效的在内部进行一个扭转。如果要是在Agent里面我们在对各个组件在做一个更深入的了解,那我们接下来看一下source
    source是一个整个的flume的源,它是最贴近于你的消息源的那么一个模块对吧?。它相当于就是一个数据源的外部采集,然后把它外部源数据接收过来然后变化成flume可以识别的格式。这个格式就是一个事件(Event),然后在从这个flume开始内部进行流转。
    然后Channel就是一个通道,刚才我们一直说缓存,你可以把它理解成缓存就好
    通道:采用被动存储的形式,即通道会缓存该事件直到该事件被sink组件处理
    所以Channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存 起来,直到它们被sinks消费掉,它在source和sink间起着一共桥梁的作用,channel是一个 完整的事务,这一点保证了数据在收发的时候的一致性. 并且它可以和任意数量的source 和sink链接
    可以通过参数设置event的最大个数
    这时候大家会有一个疑问了,你是一个存储器,那如果要是采集的这个消息量非常大,那一旦超过了你这个缓存的限制,那相当于你的内存就爆掉了对不对?这时候会导致一些节点的风险对吧?会不会有一个数据的累计,然后不断的去膨胀这么一个风险。它是可以通过配置去配置的,就是控制流量,就是控制你这个source从外界接收这个数据每一次接收需要接收多少个事件,它是有一个流量控制的,如果你前面流量放的很足那肯定会对这个存储内存会有一定的压力,那一旦有压力你可以减少这样的一个采集量就可以能够进行一个减缓,这是可以通过一个配置event来进行配置。

    Flume通常选择FileChannel,而不使用Memory Channel
    Memory Channel:内存存储事务,吞吐率极高,但存在丢数据风险
    File Channel:本地磁盘的事务实现模式,保证数据不会丢失(WAL实现)

    另外一个就是消息传到存储这块来之后,那它需要sinks来去对它进行一个消费,所以它这个Channel在这个source和sinks之间搭建了一个桥梁作用。那刚才我们说过了这个Channel它既然是一个存储,那你这个数据可以存到你的FileChannel里面,然后memoryChannel都是把数据存到内存,吞吐力高,效率高,但是容易存在丢数据的风险,那么FileChannel就是需要把你的数据落地了是不是?一旦你这个机器挂了,数据也不会丢失。
    然后sinks相当于就是在整个的Agent里面,sinks就是一个消费者,怎么把这个消息消费掉,那消息是在你的Channel里面,sinks会将你的消息或者你的事件从Channel里面进行,然后并且把你的事件开始往外输出,输出到外部的存储上面去

    Sink会将事件从Channel中移除,并将事件放置到外部数据介质上

    例如:通过Flume HDFS Sink将数据放置到HDFS中,或者放置到下一个Flume的Source,等到 下一个Flume处理。 对于缓存在通道中的事件,Source和Sink采用异步处理的方式
    Sink成功取出Event后,将Event从Channel中移除
    Sink必须作用于一个确切的Channel
    不同类型的Sink:

    存储Event到最终目的的终端:HDFS、Hbase 自动消耗:Null Sink 用于Agent之间通信:Avro

    然后一旦这个消息被sink消费掉之后,这个消息就会从这个Channel里面就会移除,它有点类似于队列的形式,那最后你这个数据是以哪种存储的形式落地了呢?是由sink来决定的,也就是你需要通过配置来控制sink你最终数据是怎么样的方式输出,你的数据是可以存在你的HDFS上或者是hbase上,它没有一个默认的一个输出,这个需要你通过一个很简单的配置你可以来控制这数据是怎么样输出来的,另外一个其实在整个的flume集群里面它是可以允许有类似多个flume,然后进行一个彼此之间的一个关联,这个像flume我们刚才打开过,它主要里面是一个Agent是吧?然后你可以把它当中一个玩具一样,然后进行一个彼此之间的拼装,然后你可以把这个集群做的规模很复杂或者一个很简单都可以,所以它在整个的集群或者是消息采集过程中它的这种集群搭建还是很灵活的。
    比如这个Agent如果你在本地搭建的时候,你这个Agent是可以直接存储到你的存储上的,这是可以的,但是有的时候你的Agent和你这个server是部署到了同一台机器上,那你这个机器就是为了来存储日志的,那你在给这个机器再去开放往这个Storage这个机器上去写的这个权限就不太合适,所以就需要把Agent数据再转到一个统一的一个中心,然后这个机器就可以进行一个对外的写服务(如下图所示)图:

    这个是一种形式,另外一个collector它得把这个分散的数据进行收集,所以通常用的时候就是配了一个agent和一个collector,但是从字面上来看这两个感觉差距很大,其实agent和collector你会发现配置的时候基本上是一样的,只不过是数据源不一样,agent的数据源是来自于你的外边真实的外部数据,你collector来的数据是来自于你的agent,就相当于你来自你的flume组件,其实这个agent和collector本质是一样的,只不过是数据源来源不一样,只不过是为了区分他们的角色,如果说遇到了这种内部组件之间的一个对接就相当于这两个flume之间的一个对接的话,那这个数据通过传输的方式的话就需要通过一个avro的方式进行对接(如下图16)

    你collector要对接的话必须要通过这样的方式去对接,你这个agent这个就很丰富了,对接着这种数据源的格式,数据源的类型就很丰富了。
    刚才我们已经说过了它一个agent内部分为最基本的三个组件是source和channcl和sink,那么这三个组件都是缺一不少的,但是它还有两种组件是可以选项,就是说你可以用可以不用,根据你的业务需求,如果你的业务需求确实是涉及到了这方面的一个要求那你这个组件就应该用,那么source和channcl和sink这三个是必须要有的,还是有两个可选的组件分别是interceptor和一个selector,interceptor是拦截器(如下图所示)


    Interceptor用于Source的一组拦截器,按照预设的顺序必要地方对events进行过滤和自定义的 处理逻辑实现
    在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的。也即在日志进入到 source之前,对日志进行一些包装、清新过滤等等动作

    这个拦截器是在什么位置呢?就是说这个interceptor是在你的数据源和你的source之间的一个环节,那这个环节相当于就是可以对你的数据源提前会做一个过滤,然后这个selector是有点类似于路由选择,就是消息已经在这了,我开始对这个消息进行一个存储,你这个消息不是要存到channel上吗?如果你后面如果有多个channel的话,那这个消息我应该是存到哪个channel上,或者是所有的channel都应该存储这样的消息,这个时候就要配置一个selector,所以selector这块就是像我们最一开始说的复制与复用(如下图所示)

    我们继续看拦截器,拦截器主要是对这个event进行一个过滤或者是自定义一些处理逻辑的实现,它主要是在你这个日志与source之间的,然后对这个日志进行一个拦截,就相当于提前制定哪些日志可以往后传,哪些数据可以直接被丢掉,然后它除了拦截之后它还可以对你的日志数据重新做一些个包装,那主要的提供的一些拦截器就有这么几个

    Timestamp Interceptor:在event的header中添加一个key叫:timestamp,value为当前的时间戳
    Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip
    Static Interceptor:可以在event的header中添加自定义的key和value
    Regex Filtering Interceptor:通过正则来清洗或包含匹配的events
    Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

    这些拦截器可以直接互相组合,就是不仅仅通过一个拦截,可以通过多个进行一个拦截器的拼装,通过这个chain的方式进行一个组合起来,然后对于组合之后的话,你可以对它进行一个前后的一个顺序依次的处理。
    然后我们再看一下这个selector,这个selector这块也是容易理解的,刚才我们说过这个selector它有两个事情,一个是复制和复用对吧?那这复制就是分别对外两个配置,一个配置就是这个Replicating,还有一个复用Multiplexing,复制刚刚讲过了,就是一个消息能够被复制多份,复用就是一个消息可以选择性的去选择(如下图)

    channel selectors 有两种类型:

    Replicating Channel Selector (default):将source过来的events发往所有channel
    Multiplexing Channel Selector:而Multiplexing 可以选择该发往哪些channel

    这个source前面有两个不同类型的消息,那这个一个类型的消息你可以选择后面的一个channel,如果只选择某一个channel去做传递消息的话,你可以选择复用的方式,如果这一个消息可以被复制多份,就像一个广播的形式发送消息的话,广播是什么意思呢?广播就是一个消息,被复制出多份然后下游每一个节点都同时接收同样的消息是不是?,那这样的情况就可以用复制的形式去用。
    问题:Multiplexing 需要判断header里指定key的值来决定分发到某个具体的channel,如果demo和demo2同时运行 在同一个服务器上,如果在不同的服务器上运行,我们可以在 source1上加上一个 host 拦截器,这样可以通过header 中的host来判断event该分发给哪个channel,而这里是在同一个服务器上,由host是区分不出来日志的来源的,我们必 须想办法在header中添加一个key来区分日志的来源 – 通过设置上游不同的Source就可以解决
    然后接下来就看一下从整体的角度来看可靠性,那为什么说这个flume它的可靠性还是比较OK的呢?那从这么几点来看

    Flume保证单次跳转可靠性的方式:传送完成后,该事件才会从通道中移除
    Flume使用事务性的方法来保证事件交互的可靠性。
    整个处理过程中,如果因为网络中断或者其他原因,在某一步被迫结束了,这个数据会在下一次重新传输。
    Flume可靠性还体现在数据可暂存上面,当目标不可访问后,数据会暂存在Channel中,等目标可访问之后,再 进行传输
    Source和Sink封装在一个事务的存储和检索中,即事件的放置或者提供由一个事务通过通道来分别提供。这保证 了事件集在流中可靠地进行端到端的传递。

    Sink开启事务 Sink从Channel中获取数据 Sink把数据传给另一个Flume Agent的Source中 Source开启事务 Source把数据传给Channel Source关闭事务 Sink关闭事务

    首先就是说它有一个事务性,这个事务性什么意思呢?就是我们刚才已经提过了,它主要是和channel类型有关,刚刚我们说了channel类型有两个一个是file一个是memory对吧?为了保证消息不丢失,为了可靠就是你可以选择file对不对?通过file channel的方式去传输,然后另外一个就是说当我这个消息在传输的过程中,当传输到了下一个节点上,那如果要是说接收的这个节点出现了一些异常,比如说一些网络异常,那由此就会就可以导致你的数据就需要重发的,然后另外在同一个节点内,如果是source写入的数据,把这个数据已经写入到这个channel里面去,那这个数据在,比如说它写这个数据它也是成批成批写的,那同时在这批之类,它整体的数据出现了一些异常的话,那这个时候就所有的数据一旦有一个数据出现了异常,那同一批的其他数据都不会写入到channel里面去,那已经接收到的部分这批数据就直接被抛弃,然后这个时候靠上一个节点重新再发送一些数据,重新再补充一些数据进来,那这里面就涉及到了事务性,那flume是使用了一个事务性的方式来保证了传输event或者传输整个事件在整个过程中的可靠性,就是说在你sink必须在你的event传入到channel之后或者是已经这个event传输到下一个channel里面或者是你这个消息已经到了外部的数据目的地之后,就相当于你这个数据已经可以认为是被下游已经完整的接收到了,就是你的数据已经在下游非常可靠的落地了,这个时候你的event才能从你的channel中进行移除掉的,所以这样的机制才能保证你的event无论是在一个agent里面还是多个agent里面之间的流转都是可以保证可靠的,并且由于这样的一个事务保证,你整个event就可以被成功的存入起来。
    然后这是一个整个消息在传输过程中比如说一个端到端传递的一个步骤,就从你的前面的sink怎么把你的数据传输到下游的另一个agent里面去这么一个过程。
    好了这个flume大概我们之前也说了,可以支持一个很强的扩展性是不是?(如下图所示)

    你可以把它想象成一个乐高玩具一样,然后进行一个有效的拼装,比如说这是一个其中一个组件,然后把这个组件进行一个前后的关联,你可以把这个组件之间并行多套,或者你可以把这个组件进行上下游进行一个关联,就是上游的sink要对接到下游的source上面,也可以多个sink同时消息汇入到下游的同一个source里面去,并且你可以做更复杂的一些搭建,这些都可以通过简单的配置就可以去完成。
    复杂流动的目的就是说这根据你的业务场景的一个复杂程度了,那你每一套agent可能下游就是面对着业务处理的流程是不一样,这个是完全是可以根据你的进行一些选择,比如如下图所示

    每一个webserver上都部署一个agent对不对?那你这有三个webserver就对着三个agent对不对?那你在这个实际的架构过程中你的日志服务器是有很多节点的,那你不可能每一个节点,如果是没有这个Consolldation的话,你每一个节点都去往这个HDFS直接去写的话,这个就不太合适,因为你这个日志服务器它仅仅是用来做日志收集做的,而你把它权限在放开到再去写HDFS,它相当于是那个角色定位有些混乱,另外一个就是说你这个数据源并发同时去写的话对HDFS操作也不是一个很好的设计,所以最好把这里面数据都汇总到同一台或者是数目比较少的那几种,只要是你那个压力能扛得住的一些Consolldation进行一个集中处理,然后Consolldation再去对接到后台的一些存储服务,这样会前面和后面部分相当于它们面对的角色是不一样的。
    还有一个就比较典型了(如下图所示)

    这个sink和这个source的一个关联。然后我再看一下这个(如下图所示)

    这个上图其实有点像我们之前讲的路由选择,你看这个左边的source可能通过不同的方式去对接的,然后把这个不同的消息通过一个channel然后通过多个sink,每一个sink都对街到后面不同的应该是Consolldation,然后每一个Consolldation进行各自的一个日志收集,那你这里面就跟你的业务相关了,可能是你这个Consolldation之间收集的数据是一致的也有可能是不一致的对不对?,然后后面对着不同的sink,这不同的sink就往后端存储的时候你可以写到HDFS上也可以写到hbase上,因为你这个数据是可以,比如说这两个channel就对接着不同的存储是不是?那你上面的这个channel是往HDFS上去写,下面这个channel是往本地文件去写,但是你可以再搞一个channel然后往Hbase上去写都可以很随意。

    搭建http://note.youdao.com/noteshare?id=7d903eb22b05f0b4943389bfc5c6d51f&sub=1993705EC6A1439DB03B29D68D278BFC
    2 留言 2019-02-10 12:33:52 奖励16点积分
  • ring0下使用ZwTerminateProcess函数结束指定进程

    背景在用户层上,Windows 给我们提供了一个 WIN32 API 函数 TerminateProcess 来结束指定进程。那么,在内核层下,也给我们提供了一个相应的内核函数来实现相应的功能,这个内核函数就是 ZwTerminateProcess。
    本文就是介绍在内核下使用 ZwTerminateProcess 函数来结束指定进程,现在,我就把实现的思路和原理整理成文档,分享给大家。
    函数介绍ZwOpenProcess 函数
    打开进程对象的句柄,并设置该对象的访问权限。
    函数声明
    NTSTATUS ZwOpenProcess( _Out_ PHANDLE ProcessHandle, _In_ ACCESS_MASK DesiredAccess, _In_ POBJECT_ATTRIBUTES ObjectAttributes, _In_opt_ PCLIENT_ID ClientId);
    参数

    ProcessHandle [out]指向类型为HANDLE的变量的指针。 ZwOpenProcess例程将进程句柄写入该参数指向的变量。DesiredAccess [in]一个ACCESS_MASK值,其中包含调用者对进程对象请求的访问权限。ObjectAttributes [in]指向OBJECT_ATTRIBUTES结构的指针,该结构指定要应用于进程对象句柄的属性。在Windows Vista和更高版本的Windows中,此结构的ObjectName字段必须设置为NULL。在Windows Server 2003,Windows XP和Windows 2000中,此字段可以作为选项指向对象名称。有关详细信息,请参阅以下备注部分。ClientId [in]指向客户端ID的指针,用于标识要打开其进程的线程。在Windows Vista和更高版本的Windows中,此参数必须是指向有效客户端ID的非NULL指针。在Windows Server 2003,Windows XP和Windows 2000中,此参数是可选的,如果ObjectAttributes指向的OBJECT_ATTRIBUTES结构指定对象名称,则可以将其设置为NULL。有关详细信息,请参阅以下备注部分。
    返回值

    成功,则返回 STATUS_SUCCESS;否则,返回错误码。

    ZwTerminateProcess 函数
    终止进程及其所有线程。
    函数声明
    NTSTATUS ZwTerminateProcess( _In_opt_ HANDLE ProcessHandle, _In_ NTSTATUS ExitStatus);
    参数

    ProcessHandle [in]表示要终止的进程的进程对象的句柄。ExitStatus [in]操作系统用作进程及其每个线程的最终状态的NTSTATUS值。
    返回值

    成功,则返回 STATUS_SUCCESS;否则,返回错误码。

    实现原理这个程序的实现和用户层实现很类似,但也有些区别。实现的具体过程如下:

    首先,先试用 ZwOpenProcess 函数根据进程的 PID 打开指定进程。要注意的就是,进程的 PID 并不是像用户层那样直接传参,而是初始化到 CLIENT_ID 结构体当中。那么,成员 UniqueProcess 就是表示进程 PID。
    然后,我们就可以根据进程句柄,调用 ZwTerminateProcess 函数结束进程。终止状态可以任意指定。
    最后,我们也要调用 ZwClose 函数来关闭打开的进程句柄,释放资源。

    经过上面 3 步,就可以实现了内核层结束进程的常规方法。
    编码实现// 结束进程BOOLEAN TerminateProcess(HANDLE hProcessId){ HANDLE hProcess = NULL; CLIENT_ID stClientId = { 0 }; OBJECT_ATTRIBUTES objectAttributs = { 0 }; stClientId.UniqueProcess = hProcessId; stClientId.UniqueThread = 0; InitializeObjectAttributes(&objectAttributs, 0, 0, 0, 0); // 打开进程,获取进程句柄 NTSTATUS status = ZwOpenProcess(&hProcess, 1, &objectAttributs, &stClientId); if (!NT_SUCCESS(status)) { ShowError("ZwOpenProcess", status); return FALSE; } // 结束进程 if (NULL != hProcess) { ZwTerminateProcess(hProcess, 0); } // 关闭进程句柄 ZwClose(hProcess); return TRUE;}
    程序测试在 Win7 32 位系统下,驱动程序结束进程成功;在 Win10 64 位系统下,驱动程序结束进程成功;
    总结和用户层进行类比,大体上是一样的流程和操作原理。要今后学习内核编程的时候,很多时候都可以通过类比用户层的方式来学习记忆,这样可以达到事半功倍的效果。
    1 留言 2019-02-10 20:10:18 奖励10点积分
  • 面试集合剑指offer——二叉搜索树第k个结点

    题目描述给定一颗二叉搜索树,请找出其中的第k小的结点。例如, 5 / \ 3 7 /\ /\ 2 4 6 8 中,按结点数值大小顺序第三个结点的值为4。
    二叉查找树(Binary Search Tree),(又:二叉搜索树,二叉排序树)它或者是一棵空树,或者是具有下列性质的二叉树: 若它的左子树不空,则左子树上所有结点的值均小于它的根结点的值; 若它的右子树不空,则右子树上所有结点的值均大于它的根结点的值; 它的左、右子树也分别为二叉排序树。
    思路简述
    递归的方式:二叉搜索树的中序遍历就是排序的,所以用中序遍历,每一次中间的时候判断是否等于k即可。
    非递归的方式:运用栈进行操作。相当于用栈实现了中序遍历,在中间进行了个数的判断。

    代码实现递归
    int count = 0; TreeNode KthNode(TreeNode pRoot, int k) { if(pRoot != null) { TreeNode leftNode = KthNode(pRoot.left, k); if(leftNode != null) return leftNode; count++; if(count == k) return pRoot; TreeNode rightNode = KthNode(pRoot.right, k); if(rightNode != null) return rightNode; } return null; }

    TreeNode KthNode(TreeNode pRoot, int k) { Stack<TreeNode> stack = new Stack<TreeNode>(); if(pRoot==null||k==0) return null; int t=0; while(pRoot!=null ||stack.size()>0){ while(pRoot!=null){ stack.push(pRoot); pRoot = pRoot.left; } if(stack.size()>0){ pRoot= stack.pop(); t++; if(t==k) return pRoot; pRoot= pRoot.right; } } return null; }
    0 留言 2019-02-10 17:06:45 奖励5点积分
  • 使用WinDbg双机调试SYS无源码驱动程序 精华

    背景有很多学习逆向的小伙伴,逆向一些用户层的程序很熟练了,但是由于没有接触过内核驱动开发,所以对于驱动程序的逆向无从下手。
    对于驱动程序的调试可以分为有源码调试和无源码调试。本文主要讨论无源码驱动程序的调试,也就是逆向驱动程序的方法和步骤。本文演示的是使用 VMware 虚拟机和 WinDbg 程序实现双击调试。
    实现过程VMware虚拟机设置1.打开相应 WMware 虚拟机上的 “Edit virtaul machine settings”。

    2.“Hardware”选项中 —> 点击“Add”,添加一个串口设备 Serial Port 。如果有打印机(Printer)存在,则先移除虚拟机的 打印机 硬件,然后再添加串口设备 Serial Port,因为打印机会占用串口 COM1。

    3.“Next”,在“Serial Port” 里选中 “Output to named pipe”。

    4.“next”,然后如下设置:

    5.“Finish”之后,回到如下“Virtual Machine Settings”页面,在“I/O Mode” 里选中“Yield CPU on poll”。

    6.点击“OK”之后,WMware 虚拟机设置就完成了。接下来,我们开机,进入虚拟机系统中,并对虚拟机系统进行设置,将其设置成调试模式。
    虚拟机里的操作系统设置1.如果操作系统不是 Win10,则开机进入桌面后,在运行窗口输入 msconfig —> 引导 —> 高级选项 —> 调试 —> 确定。

    2.如果操作系统是 Win10,则
    1.在设置 —> 安全和更新 —> 针对开发人员 —> 开发人员模式;

    2.管理员身份运行CMD,输入 bcdedit /set testsigning on 开启测试模式;

    3.在运行窗口输入 msconfig —> 引导 —> 高级选项 —> 调试 —> 确定;

    3.关机重启,这样虚拟机里的操作系统就设置完成了。接下来,就开始在真机系统上对 WinDbg 程序进行设置并下断点调试了。
    使用 WinDbg 程序开始双机调试无源码驱动程序1.我们在真机上以管理员身份运行 WinDbg,点击 File —> Kernel Debug —> COM,然后在 Port中输入:\\.\pipe\com_1,其它都勾选上,点击“确定”。

    2.通常 WinDbg 就会一直显示 等待建立连接(Waiting to reconnect…) 状态,如下面所示:

    3.这时,需要我们点击 Break(Ctrl+Break) 暂停调试。这样,虚拟机就会停下来,我们就可以在 WinDbg 中输入命令。


    4.我们就可以输入命令,使用 WinDbg 程序来调试虚拟机中的操作系统内核。先来介绍些常用的 WinDbg 命令:
    lm :表示列举虚拟机加载和卸载的内核模块起始地址和结束地址。bu、bp :下断点。u、uf :反汇编指定地址处的代码。dd : 查看指定地址处的数据。dt : 查看数据类型定义。
    其中,bu 命令用来设置一个延迟的、以后再求解的断点,对未加载模块中的代码设置断点;当指定模块被加载时,才真正设置这个断点;这对动态加载模块的入口函数或初始化代码处加断点特别有用。
    在模块没有被加载的时候,bp 断点会失败(因为函数地址不存在),而 bu 断点则可以成功。新版的 WinDBG 中 bp 失败后会自动被转成 bu。
    那么,在无源码的驱动程序 .sys 的入口点函数 DriverEntry 中下断点的指令为:
    bp 驱动模块名称+驱动PE结构入口点函数偏移// 如:bp DriverEnum+0x1828
    5.我们演示调试无源码驱动程序 DriverEnum.sys,虚拟机系统环境是 Win10 64位。然后,在 WinDbg 程序中输入指令:bp DriverEnum+0x1828。其中,bp表示下断点;DriverEnum 表示驱动程序 DriverEnum.sys 的驱动模块名称;0x1828 表示驱动程序 DriverEnum.sys 的入口点偏移地址,这个偏移地址可以由 PE 查看工具查看得出,如下图:

    输入完下断点指令后,我们在输入 g,让系统继续执行,直到执行到断点处,便会停下。

    我们,在虚拟机系统里,加载并启动我们的驱动程序 DriverEnum.sys,在启动之后,真机上的 WinDbg 成功在 DriverEnum.sys 的入口点 DriverEntry 处停下。这时,我们可以使用快捷键 F10 或者 F10 步过或者步入来单步调试,还可以继续使用 bp 下多个断点。

    总结步骤并不复杂,只是啰嗦而已。大家细心点跟着上述教程,认真操作就可以成功对无源码的驱动程序的入口点函数 DriverEntry 下断点,实现调试。
    1 留言 2019-02-09 22:44:12 奖励12点积分
  • 大数据8.Zookeeper

    前文链接:https://write-bug.com/article/2119.html
    前面很多文章已经多多少少涉及到了zookeeper,后面的文章也会更加依赖zookeeper,所以这里正式介绍一下:
    Zookeeper原理前文说过,其角色是生态中的一个权威角色,贯穿与整个生态,那么它为什么可以有效的协调不同机器之间的工作呢?
    zookeeper是一个分布式锁服务、名字服务器、分布式同步、组服务,基于Paxos协议在集群内访问任何一台机器得到result都是一样的。(Google内部实现叫Chubby)
    我们在单机开发的时候涉及到锁的是多线程开发:内存锁,互斥锁,读写锁等等,在一个进程内部对公共资源进行竞争,而一个进程空间的内存地址是一致的,所以导致不同的线程对同一块进程空间都是有操作权限的,代码与代码段有公共区域读写的时候会造成混乱,需要有一个锁来控制顺序关系。

    在分布式系统中,就不能通过几行代码去写一个锁解决机器与机器之间的协调和大量数据了,机器之间是独立隔离的,为了防止分布式系统中多个进程之间互相干扰需要有个进程进行调度,这个技术核心就类似于分布式锁服务。
    核心问题:没有一个全局的主控,协调和控制中心
    我们需要一个松散耦合(对硬件依赖不强)的分布式系统中粗粒度锁(粗到节点顺序工作就可以)以及可靠性存储(低容量存储数据)的系统去解决这个问题。
    数据模型:与标准文件系统很相似,但不能像Linux一样 : cd ..
    只能通过一个绝对路径去访问某一个节点,这有点像HDFS操作命令的路径,并且在zookeeper这么一个节点里面,它可以存储一些数据,并且他自然而然就给你配一些属性,这个属性就关于你数据的长度,创建时间,修改时间等等,然后你这个节点具有文件属性又有路径的一个特点,即它可以存数据,又可以通过一个绝对路径能够访问到指定的节点。
    在Zookeeper里面没有文件和路径的说法的,其实每个文件和目录都是一个节点
    那说到节点的话,这节点就有一定的属性了,节点属性有四个:

    Persistent Nodes:永久有效的节点,只要不手动删除(Client显式的删除),系统不崩溃文件永久存在
    Ephemeral Nodes:临时节点,仅在创建该节点client保持连接期间有效(心跳机制),一点连接丢失,zookeeper会自动删除该节点,(机器挂掉的时候所连接创建的该目录被删除,即使机器恢复也只能再次创建才可以)
    Sequence Nodes:(名字服务器(名字不重复))顺序节点,client申请创建该节点时,zookeeper会自动在节点路径末尾添加递增序列号,这种类型是实现分布式锁,分布式queue等特殊功能的关键;不允许单独存在,需要和前面两种任何一种同时存在

    监控机制

    getChildren():zookeeper是有监控功能的,可以监控某台机器是否生效,比如可以应用在HDFS2.0的主备切换机制里,如果主挂了,就可以监控并启动备份节点了,这个2.0中介绍的很详细了,那么如何监控到主挂了呢,就是通过临时节点监控这个机器节点的,如果这个机器一旦出现异常,临时节点就消失了,那么感知到节点消失这个事件发生的就是消失节点的父节点,再由父节点主动去上报备用节点或者不同场景的上层服务器节点(比如流量分发器), 比如网民访问新浪首页,流量分发器同时分发50%流量给两台服务器,但是server1挂了,创建的节点消失了,那流量分发器如何检测到server1挂了呢?父节点会主动上报(数据里面可以写是连接的哪个连接IP地址,其实流量分发器可以把父节点下的所有节点遍历一遍,就可以知道映射和连接的那个节点IP就知道哪个服务器挂了,不给分发流量就行了),以上是一个getChildren()的监控
    getData():节点数据发生变化的监控
    exists():节点是否存在

    三个关键点:

    一次性监控,被触发后,需要重新设置(只上报一次,上报一次后节点挂了或者重新恢复节点时候,父节点已经不会通知了,每次需要触发的时候就要重新设置)
    保证先收到事件,再收到数据修改的信息
    传递性,如create或者delete会触发节点监控点,同时也会触发父节点的监控点

    风险:

    客户端看不到所有数据变化,比如说网络原因(比如由于网络IO接收不到监控变化)
    多个事件的监控,有可能只会触发一次。一个客户端设置了关于某个数据点exists和get Data的监控,则当该数据被删除的时候只会触发被删除的通知
    客户端网络中断的过程无法收到监控的窗口时间,要由模块进行容错设计

    数据访问权限
    zookeeper本身提供了ACL机制,表示为 scheme: id: permissions,第一个字段表示采用哪一种机制,第二个id表示用户,permissions表示相关权限(如只读,读写,管理等),每个节点上的“访问控制连”(ACL,Access Control List)保存了各客户端对于该节点的访问权限
    例:
    IP:19.22.0.0/16,READ 表示IP地址以19.22开头的主机有该数据节点的读权限
    权限:

    CREATE 有创建子节点的权限
    READ 有读取节点数据和子节点列表的权限
    WRITE 有修改节点数据的权限,无创建和删除子节点的权限
    DELETE 有删除子节点的权限
    ADMIN 有设置节点权限的权限

    模式机制:

    World 它下面只有一个id, 叫anyone, world: anyone代表任何人,zookeeper 中对所有人有权限的结点就是属于world: anyone的
    Auth 已经被认证的用户(可以用过用户名:密码的方式,kerberos)
    Digest 通过username:password字符串的MD5编码认证用户
    Host 匹配主机名后缀,如,host: corp.com匹配host: host1.corp.com, host: host2.corp.com,但不能匹配host: host1.store.com
    IP 通过IP识别用户,表达式格式为 addr/bits

    public class NewDigest {public static void main(String[] args) throws Exception {List<ACL> acls = new ArrayList<ACL>(); //添加第一个id,采用用户名密码形式 Id id1 = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin"));ACL acl1 = new ACL(ZooDefs.Perms.ALL, id1);acls.add(acl1); //添加第二个id,所有用户可读权限Id id2 = new Id("world", "anyone");ACL acl2 = new ACL(ZooDefs.Perms.READ, id2);acls.add(acl2);// zk用admin认证,创建/test ZNode。 ZooKeeper zk = new ZooKeeper("host1:2181,host2:2181,host3:2181", 2000, null); zk.addAuthInfo("digest", "admin:admin".getBytes());zk.create("/test", "data".getBytes(), acls, CreateMode.PERSISTENT); }}
    API参考:http://zookeeper.apache.org/doc/r3.3.3/api/org/apache/zookeeper/ZooKeeper.html应用

    配置管理

    更新配置文件(过滤,地域等等策略),人工操作单机机器多时过麻烦,脚本配置机器压力过大,而在zookeeper集群中只需要sever都访问这个节点更新配置就好了,虽然是个一个节点,但背后是个集群,压力会被集群后的机器自然的分散开。

    集群管理

    监控机器状态-》比如临时节点getChildren()遍历选主-》临时节点+顺序节点:选择当前是最小编号的 Server 为 Master ,主节点挂掉时,临时节点消失,选新主节点自动加1,成为新主节点,当前的节点列表中又出现一个最小编号的节点,原主节点复活后想成为主节点,需要创建新节点尾号再加1成为一个普通节点。
    共享锁服务


    控制不同节点顺序(粗粒度)协同,与选主类似,先处理最小编号节点进程任务

    队列管理
    同步队列 • 所有成员都聚齐才可使用—getChildren方式父节点通知FIFO队列 • 生产消费者—最小编号先处理

    zookeeper安装
    参考:http://note.youdao.com/noteshare?id=168883c03a9eb0d7b2e0c5a0a0216e1a&sub=963C8CD231AD4BF5ACE97C2FCAD96431
    实践
    基本操作命令
    执行客户端 zkCli.sh

    ls / 查看当前目录
    create /text “test” 创建节点
    create -e /text “test” 创建临时节点
    reate -s /text “test” 创建序列节点
    get /test 查看节点

    Java代码操作
    0 留言 2019-02-09 14:24:57 奖励15点积分
  • 大数据7.HBase

    前文链接:https://write-bug.com/article/2108.html
    HBase原理上节我们介绍了Hive,其本质为MR Job,而HBase不同于Hive,它是一个面向存储的开源的非关系型分布式数据库(NoSQL),适合存储海量、稀疏数据(不保证每条记录每个字段都有值),大部分数据存在HDFS上,少量在自身内存cache中。
    Hbase(Nosql数据库)与关系型数据库对比:行存储:关系型数据库

    优点:写入一次性完成,保证数据完整性,写入时做检查
    缺点:数据读取过程中产生冗余数据,若有少量数据可以忽略

    列存储:Nosql数据库

    优点:读取过程,不会产生冗余数据,只查询了一个字段,特别适合对数据完整性要求不高的大数据领域
    缺点:写入效率差,保证完整性方面差

    优势:

    海量数据存储:大量数据存在HDFS中
    快速随机访问:在HDFS中更适合从头到尾的这种顺序读取,不适合随机访问,而Hbase只需要读取某一key的字段就可以了
    可以进行大量改写操作:为什么HDFS中不能呢?HBase在修改数据时,是在数据dump到磁盘之前还在cache中的时候做修改的,这样的速度是HDFS所不能比的,还有HBase在改写一个字段的时候,实质上还是在cache中完成操作的,而dump磁盘是一个异步的过程,其实修改后的数据与旧数据到了两个文件中,我们用户感知到的只是查询数据时候其返回了我们修改后的最新数据就可以了,这里我们下面会介绍的version号就知道了

    Hbase结构逻辑模型

    RowKey:有了这个key,我们才能查询到后面的value,是表中每条记录的“主键”,方便快速查找,Rowkey的设计非常重要。
    Column Family:列族,拥有一个名称(string),包含一个或者多个相关列
    Column qualifier:属于某一个columnfamily,familyName :columnName,每条记录可动态添加


    我们有了上面这三个信息,就可以定位到具体的列字段了。
    再看上面的表中有些cq中包含着几个timestamp时间戳,背后存着不同的value值,这就是前面说的改写具体字段存在不同的文件中。
    Version Number:类型为Long,默认值是系统时间戳,可由用户自定义,每一个时间戳的背后存着不同的版本,可以设置个数,默认显示最新时间戳的字段(即倒序)
    三维有序:
    三个维度:
    -rowkey
    -column family->column qualifier
    -version(timestamp)->value

    {rowkey => {family => {qualifier => {version => value}}}}
    a :cf1 :bar :1368394583:7

    有序:我们在存储数据时,系统会按照rowkey字典序去排序数据,然后里面列存储再按照cf、cq的字典序,和时间戳的倒序排序数据。
    物理模型我们前面介绍的数据模型格式都是以一张表格的形式去做剖析,为了我们所理解,而出现的逻辑概念。而真正机器存储时是怎么存储的呢?

    我们这里抽调出一张逻辑表格来看,假如这张表格数据量非常的大,一台机器很难存储一张表格,那我们计算机应该怎样去存储呢?肯定是把这张表格分片到很多台机器中才符合我们处理大数据的思想嘛,我们看到图中的数据依旧按照rowkey的字典序进行排序,并且上面切分为好多的region(区域),注意!这里的切分是按照行rowkey切分的。

    这里有一个细节:我们切分前后,里面的数据都是按照rowkey排好序的
    Hregion就相当于前面MR中的partition功能,进来一条数据,按照rowkey分配插入到一个region中,因为前面说了数据是按照key排好序的,所以这条数据就不可能分配到其他的region里面了,即相同的key也同样是肯定在同一个region上
    HBase集群分布数据的最小单位为region。想象成HDFS上的block,如果一台机器挂了,其上的所有block都将消失,数据也就失去了我们设置的3副本的平衡,需要去别的机器拷贝出数据到新的DN上来达到平衡和负载均衡(请求压力)。而HRegion是分布式存储负载均衡的最小单元,但不是最小存储的最小单元
    HRegion是物理概念(进程),Region是逻辑概念

    行锁定:假如我们rowkey为itemid,userA正在对一个itemid的字段做修改,与此同时,userB也想对这个字段的不同列做修改是不允许的,只有当时读写的进程具有绝对的主权,粒度按行锁定(粗)。

    前面说过,region数据分布就像HDFS中的block,所以换言之一个机器节点中有多个region,这个节点里面有一个进程叫Regionserver,主要负责用户的IO请求,与HDFS的交互,也就是一个HRegionserver进程中存储管理了多个HRegion对象(不一定来自于同一个table)。

    HRegion再划分,里面又分为多个Hstore,HStore是HBase中核心的存储单元。
    Hstore由两个部分组成:内存和HDFS文件
    即HStore=Memstore+StoreFile(HFile)
    HStore—-对应逻辑table中的columnFamily
    MapReduce中Memstore:100M->80%->往磁盘上spil
    Memstore: 写缓存,默认128M->将内存Flush为一个StoreFile,每个CF都有自己的Memstore
    HLog: WAL(Write-Ahead-Log)预写日志,避免数据丢失

    我们在往memStore和Hstore写数据时,假设正在写过程此Hregion挂了,那么我们写好的数据就会丢失,所以它不会直接写到某个机器上,而是先写到HDFS上,先有一个HDFS的公共区域(HLog),写完之后再写到一个Hregion里面去,所以这时Hregion挂了,我只需要从HLog中恢复数据就好了。
    每一个RegionServer都可以读取Hlog,管理维护多个Region,每一个Region都可以读HLog,如果Region不出现错误情况下,就和HLog没有任何交互,如果一个RegionServer挂掉,涉及到Region迁移,里面的数据已经消失了,是在Hlog中恢复到其他Region Server中,就像Mysql中的远程备份,replication,只是把Mysql操作的每一条sql语句重新执行一遍,就相当于恢复数据了。
    Client往Region Server 端提交数据时,会先写WAL日志,写成功后才会告诉客户端提交数据成功。在一个RegionServer上的所有的Region都共享一个HLog,一次数据的提交先写WAL,写入成功后,再写memstore,当memstore值到达一定阈值,就会形成一个个Store File(理解为Hfile格式的封装,本质为Hfile)
    blockcache:读缓存,加速读取效率(假设没有blockcache,读取顺序(指定好rowkey,CF: CQ):rowkey-》某HRegion-》CF-》Hstore-》MemStore-》二分法查找Store File(因为dump时rowkey是排序好的))读取后再缓存到blockcache中。

    这里引发一个问题:读取时可能会在memstore和多个Hfile中都出现相同的rowkey,这种情况怎么办呢,只能靠我们平时的合并机制(通常把很小的region merge到一起,手动完成,整合相同rowkey)了。

    分裂:我们在按照rowkey分配region时可能是同一个范围但一般每个region存储数据的规模是不一样的,随着数据插入,region随之增大,当增大到一个阈值时,按照大小平均分裂成两个新的region,分区参数默认10个G。
    Region数目:每一个region信息都要在zookeeper上有一个节点来维护。这之前还没讲过zookeeper,可以先类比HDFS的block在Name Node中作为元数据,不支持小文件。所以:

    太多:增加zookeeper负担,读写性能下降
    太少:降低读写并发性能,压力不够分散。

    对于region过大的要做切分,切分更小粒度的region分散到不同的regionserver上去缓解压力作负载均衡。
    这里出现一个问题:如果用户正在访问正在分裂的region上的数据时,是请求不到数据的,只能等到分裂完成后才能为用户继续服务。通常情况下,我们把自动切分禁止,请求量空闲时手动切分。
    Hbase架构
    我们在图中可以看到主要有这么几个角色(client、Hmaster、HRegionSever(本地化)、Zookeeper)

    HMaster(主):

    负载均衡,管理分配region
    DDL:增删改->table,cf,namespace
    类似于namenode上管理一些元数据,这里管理table元数据
    ACL权限控制
    可以启动多个Master,但真正服务只有一个master服务,Master挂了,其他的启动,进行一个选主。数据和storm一样都在zookeeper上,主挂了,不影响数据的读,但影响数据的写,还会影响region的分裂,也就是负载均衡,和storm的fail-fast很像,真正数据在hdfs上,zookeeper上存储的是元数据

    请查看HDFS元数据解析还有,https://blog.csdn.net/qq_33624952/article/details/79341477
    https://blog.csdn.net/vintage_1/article/details/38391209
    HRegionSever(从):—-通常和DN部署在一起,本地化原则。

    管理和存放本地的Hregion
    读写HDFS,提供IO操作,维护Table数据
    本地化,MapReduce中,数据不移动计算框架移动,是为了尽量减少数据迁移,在HBASE里面,Hregion的数据尽量和数据所属的DataNode在一块,但是这个本地化不能总是满足和实现,因为region是不断移动的,插入,不断分裂,一旦分裂就变了,什么时候本地化可以从不能保证到保证的过度呢,也就是合并。。。。

    Zookeeper:

    在master与zookeeper之间,master与regionsever之间提供心跳机制
    保证集群中只有一个Master
    存储所有Region的入口(ROOT)地址——新版本取消 ,比如操作一个表,第一件事要找到这张表所对应的服务器,第二找到存在那几个region上,读的key,定位到记录在哪个region,regionserver也就确定了,如果缓存里面没有这个表,就要去想办法找这个regionserver,第一步就要找到zookeeper上的相应地址,zookeeper为了寻址存储这个入口数据
    实时监控Region Server的上下线信息,并通知Master

    Client:

    寻址:通过zk获取HRegion的地址,同时client具有缓存rowkey-》HRegion映射关系的功能(读的数据表的机器地址预存,减少网络交互)加速RegionServer的访问
    表的设计rowkey设计原则:
    长度:最大长度64kb,越短越好,尽量不超过16byte(机器64位——》内存对齐8字节,所以控制在8字节整数倍可以获得最大性能)
    过长:内存利用率降低,不能缓存太多value数据
    分散:建议rowkey前加hash散列字段(程序位置固定生成)、反转—-避免固定开头解决热点问题
    唯一性
    反转例:
    IP:地区相同分在一个region中,负载不均匀:
    把IP地址倒序存储—思考:电话号码、时间戳
    hash散列字段例:
    加密算法:Hash(crc32,md5)
    CF设计原则:
    数量:在业务要求下,尽量少数量:建议一到两个。
    一个文件:传统行存储做法,如果想要访问个别列数据时,需要遍历每一列,效率低下。
    多个文件:列存储做法,过多时影响文件系统效率。
    折中方案:将不同CF数据列分开存储,比如把经常访问的类似的数据字段列尽可能分配到同一CF中。
    当某个CF数据flush时,其他CF也会被关联触发flush,如果CF设计比较多,一旦产生连锁反应,会导致系统产生很多IO,影响性能。
    flush和region合并时触发最小单位都是region,如果memstore数据少量时没有必要做flush
    Hbase容错在HMaster和HRegionSever连接到ZooKeeper后,通过创建Ephemeral临时节点,并使用Heartbeat机制维持这个节点的存活状态,如果某个Ephemeral节点失效,则Hmaster会收到通知,并作相应处理。
    HLog
    除了HDFS存储信息,HBase还在Zookeeper中存储信息,其中的znode信息:

    /hbase/root-region-server ,Root region的位置
    /hbase/table/-ROOT-,根元数据信息
    /hbase/table/.META.,元数据信息
    /hbase/master,当选的Mater
    /hbase/backup-masters,备选的Master
    /hbase/rs ,Region Server的信息
    /hbase/unassigned,未分配的Region
    Master容错:Zookeeper重新选择一个新的Master;无Master过程中,数据读取仍照常进行,region切分、负载均衡等无法进行
    Region Server容错:定时向Zookeeper汇报心跳,如果一旦时间内未出现心跳,Master将该RegionServer上的 Region重新分配到其他RegionServer上,失效服务器上“预写”日志由主服务器进行分割 并派送给新的RegionServer
    Zookeeper容错:一般配置3或5个Zookeeper实例

    Hbase操作安装笔记:http://note.youdao.com/noteshare?id=45a15ce17d5e6c0ead6db5f806d600d3&sub=3CB5F03947634E38811A45AE080F9028
    Hbaseshell操作
    pythonHbase
    mr+Hbase
    Java+Hbase
    Scala+Hbase
    Hive+Hbase
    1 留言 2019-02-08 14:31:02 奖励16点积分
  • 使用VS2013搭建内核开发环境并使用VS2013自带的WinDbg双机调试有源码的驱动程序 精华

    背景想要学习内核 Rootkit 开发,那么第一件事就是要搭建好开发环境,第二件事情就是要了解如何调试驱动代码。这两点区别,和使用 VS2013 开发应用程序完全不同。在内核下,我们使用的是 WinDbg 来双机调试。所谓的双机调试,就是指有两台计算机,一台计算机上面运行要调试的程序,另一台计算机上面运行 WinDbg 来调试程序,两台计算机之间可以通过串口通信。
    本文介绍的是使用 VS2013 开发环境开发驱动程序的环境配置,以及使用 VMWare 虚拟机搭建双机调试环境,实现使用 VS2013 开发环境自带的 WinDbg 调试开发的有源码的驱动程序。现在,我就把实现过程整理成文档,分享给大家。
    实现过程使用 VS2013 开发驱动程序VS2013 要进行驱动开发,必须先安装 WDK8.1,可以在 微软驱动开发官网 上进行下载。注意,下载的 WDK 一定要对应自己的 VS 版本,VS2013 就下载 WDK8.1。
    安装完毕 WDK8.1 之后,我们就可以使用 VS2013 创建驱动项目工程,开发驱动程序了。具体的步骤为:
    1.运行 VS2013 开发环境,点击菜单栏“文件” —> “新建” —> “项目” —> “模板” —> “Visual C++” —> “Windows Driver” —> “Empty WDM Driver”。要注意的是,WDK8.1 提供的模板中没有提供 NT 驱动模板,但是我们可以新建 WDM空模板 工程,然后向工程项目中添加头文件、代码文件,编译链接之后,生成的驱动程序就是 NT 驱动了。

    2.建立工程后,首先会有两个工程,一个就是驱动工程,另外一个是 Package 工程(这个是测试驱动安装的一个工程,对于 NT 驱动来说其实没有什么用处,可以直接删除)。驱动工程中会帮你建立一个 inf 文件,NT是使用不到的(当然新一代的过滤驱动,例如 Minifilter 是使用的,VS2013 支持直接创建 Minifilter 工程),可以直接删除。
    3.由于创建的是一个空项目,所以需要我们自己手动添加代码文件。直接添加一个Driver.c(有很多人说使用C++开发驱动,但是个人还是觉得使用 C 开发比较适合,因为微软内核使用的也是 C,而且 C 是能够直接操作内存。),并声明头文件、编写入口点函数 DriverEntry:

    4.接下来,编译驱动代码,报错。没有关系,查看出错原因,无外乎一些警告被当做错误,或者一些函数参数没有被使用,导致编译不过,这些都是因为安全警告等级太高了,我们可以分两种方式解决:一是将所有的警告和安全措施,全部都做到。例如没有使用的参数使用宏UNREFERENCED_PARAMETER等等。要做到这些,有时候基本没有办法写程序。二是降低警告等级。具体步骤为:
    1.打开项目属性页;
    2.C/C++ —> 常规 —> 警告等级选择“等级3(/W3)” —> 将警告视为错误选择“否(/WX-)”;

    3.链接器 —> 常规 —> 将链接器警告视为错误选择“否(/WX:NO)”;

    4.Driver Signing —> General —> Sign Mode 选择“Off”。

    设置完毕后,再编译链接驱动代码,成功生成 .sys 驱动程序。接下来,我们就开始讲解 WinDbg 双机调试。
    双机调试VMware虚拟机设置1.打开相应 WMware 虚拟机上的 “Edit virtaul machine settings”。

    2.“Hardware”选项中 —> 点击“Add”,添加一个串口设备 Serial Port 。如果有打印机(Printer)存在,则先移除虚拟机的 打印机 硬件,然后再添加串口设备 Serial Port,因为打印机会占用串口 COM1。

    3.“Next”,在“Serial Port” 里选中 “Output to named pipe”。

    4.“next”,然后如下设置:

    5.“Finish”之后,回到如下“Virtual Machine Settings”页面,在“I/O Mode” 里选中“Yield CPU on poll”。

    6.点击“OK”之后,WMware 虚拟机设置就完成了。接下来,我们开机,进入虚拟机系统中,并对虚拟机系统进行设置,将其设置成调试模式。
    虚拟机里的操作系统设置1.如果操作系统不是 Win10,则开机进入桌面后,在运行窗口输入 msconfig —> 引导 —> 高级选项 —> 调试 —> 确定。

    2.如果操作系统是 Win10,则
    1.在设置 —> 安全和更新 —> 针对开发人员 —> 开发人员模式;

    2.管理员身份运行CMD,输入 bcdedit /set testsigning on 开启测试模式;

    3.在运行窗口输入 msconfig —> 引导 —> 高级选项 —> 调试 —> 确定;

    3.关机重启,这样虚拟机里的操作系统就设置完成了。接下来,就开始配置 VS2013,使用 VS2013 上的 WinDbg 调试程序。
    VS2013 驱动调试配置1.点击菜单栏“DRIVER” —> “Test” —> “Configure Computers…”;

    2.然后,点击“Add New Conputer”,添加配置信息。

    3.接着,选中“Manually configure debuggers and do not provision”,点击“下一步”;

    4.最后,选中“Serial”通信,波特率为“115200”,勾选“Pipe”,勾选“Reconnect”,管道名称为“\.\pipe\com_1”,目标端口为“com1”;这只完后,点击下一步即可完成 VS2013 上面的调试配置。

    开始正式调试当配置好 VMware 虚拟机环境以及 VS2013 上面的调试环境之后,我们就可以来调试驱动程序了。调试方法和调试应用程序一样,先下断点、然后运行程序,程序执行到断点处就会停下。但在双机调试下,就会变成:
    1.首先,我们使用快捷键 F9 在驱动程序代码上下断点,再按下 F5 运行程序,会弹出提示框,我们选择继续调试:


    2.这时,驱动程序就一直处于 waiting to reoonnect… 状态。要特别特别特别注意:在 VMware 虚拟机中加载驱动程序之前,我们先在 VS2013 暂停下调试,暂停成功后,再按 F5 继续调试,这时才去 VMware 中加载驱动程序。
    如果不先暂停的话,加载运行驱动程序,代码断点不能成功断下来。暂停之后,可以成功断下来。这可能是一个 Bug 吧,反正大家如果遇到断不下来的情况,都先试试先暂停,再运行调试这种办法吧。


    3.我们到 VMware 中加载运行驱动程序,VS2013 成功在断点处断下。那么,这时,我们就可以使用快捷键 F5、F9、F10、F11,像调试应用层序那样调试驱动程序了。

    总结步骤并不复杂,只是啰嗦而已。大家细心点跟着上述教程,认真操作就可以成功开发驱动程序,并使用 WinDbg 实现双机调试驱动程序源码。
    1 留言 2019-02-09 11:38:26 奖励11点积分
  • 大数据6.Hive

    前文链接:https://write-bug.com/article/2091.html
    Hive原理
    一种Sql解析引擎—>把Sql语句解析成MR Job(本质就是MR)
    Hive中的表是纯逻辑表,就只是表的定义等,即表的元数据。本质就是Hadoop的目录/文件, 达到了元数据与数据存储分离的目的,本身不存储数据,真实数据存储在HDFS上,元数据存在MySql上
    读多写少,不支持数据改写和删除
    Hive中没有定义专门的数据格式,由用户指定,需要指定三个属性:

    列分隔符 ‘ ’ ‘\t’ ‘\001’
    行分隔符 \n
    读取文件数据的方法 TextFile、SquenceFile(二进制文件\<k,v>—-Java)、RCFile(面向列)


    为什么有了MR还用Hive?
    面向非开发人员,简单有效:
    word_countselect word, count(*) from ( select explode(split(sentence, ' ')) as word from article ) t group by wordHql和Sql区别

    可拓展性(用户自定义函数):

    UDF 普通函数 类比:map 一对一
    UDAF 聚合函数 类比:groupByKey 多对一
    UDTF 表生成函数 类比:flat Map 一对多

    数据检查:

    Hql:读时模式 读慢(读时检查、解析字段) load快(写时不需要加载数据)
    Sql:写时模式 读快 load慢(写时做索引、压缩、数据一致性、字段检查等等)

    Hive体系架构
    Cli用户接口, 进行交互式执行SQL: 直接与Driver进行交互

    JDBC驱动,作为JAVA的API:JDBC是 通过Thift Server来接入,然后发送给Driver

    Driver sql->MR

    元数据:是一个独立的关系型数 据库,通常Mysql,Hive会在其中保存表模式和其他系统元数据

    存储模式:
    derby(默认,本地单用户模式)—存储元数据结构信息meta store
    mysql 多用户模式

    本地
    远程 —-独立机器,互相解耦

    数据管理方式hive的表本质就是Hadoop的目录/文件 – hive默认表存放路径一般都是在你工作目录的hive目录里面,按表名做文件夹分开,如果你 有分区表的话,分区值是子文件夹,可以直接在其它的M/R job里直接应用这部分数据。

    Hive的create创建表的时候,选择的创建方式:

    create table 内部表
    create external table外部表(删表时只删除元数据)

    Partition

    分区表(/tablename/条件字段)表中的一个 Partition 对应于表下的一个目录,所有的 Partition 的 数据都存储在对应的目录中
    例如:pvs 表中包含 ds 和 city 两个 Partition,则

    对应于 ds = 20090801, ctry = US 的 HDFS 子目录为:/wh/pvs/ds=20090801/ctry=US;
    对应于 ds = 20090801, ctry = CA 的 HDFS 子目录为;/wh/pvs/ds=20090801/ctry=CA

    作用:辅助查询,缩小查询范围,加快数据的检索速度和对数据按照一定的 规格和条件进行管理。
    Bucket
    —分桶表(clusted by user into 32 buckets /lbs/mobile_user/action=insight/day=20131020/part-00000)数据采样sampling、控制数量
    —CLUSTERED BY —-bucket中的数据可以通过‘SORT BY’排序。
    ‘set hive.enforce.bucketing = true’ 可以自动控制上一轮reduce的数量从而适 配bucket的个数,
    当然,用户也可以自主设置mapred.reduce.tasks去适配 bucket个数
    —tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y)
    查看sampling数据: – hive> select * from student tablesample(bucket 1 out of 2 on id); – tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y) – y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32 时,抽取(64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。x表示从哪个bucket开始抽 取。例如,table总bucket数为32,tablesample(bucket 3 out of 16),表示总共抽取(32/16=)2个bucket的数据 ,分别为第3个bucket和第(3+16=)19个bucket的数据
    数据类型
    原生

    TINYINT
    SMALLINT
    INT
    BIGINT
    BOOLEAN
    FLOAT
    DOUBLE
    STRING
    BINARY(Hive 0.8.0以上才可用)
    TIMESTAMP(Hive 0.8.0以上才可用)

    复合

    Arrays:ARRAY<data_type>
    Maps:MAP<primitive_type, data_type>
    Structs:STRUCT<col_name: data_type[COMMENT col_comment],……>
    Union:UNIONTYPE<data_type, data_type,……>

    例:Map
    张三 “数学”:80,“语文”:90,“英语”:79
    李四 “数学”:70,“语文”:60,“英语”:79
    name scoreselect score from table{“数学”:80,“语文”:90,“英语”:79}{ “数学”:70,“语文”:60,“英语”:79}select name,score[‘数学’]from table
    张三 80
    李四 70
    例:Sql实现mr_join:
    INSERT OVERWRITE TABLE pv_usersSELECT pv.pageid,u.ageFROM page_view pvJOIN user uON(pv.userid=u.userid);
    例:Sql实现groupby —-mr_wordcountSELECT pageid,age,count(1)FROM pv_usersGROUP BY pageid,age
    Hive优化map优化优化并发个数减小
    set mapred.max.split.size=100000000;set mapred.min.split.size.per.node=100000000;set mapred.min.split.size.per.rack=100000000;set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;set mapred.map.tasks=10;—-适当加map数:几乎不起作用block大小会影响并发度set dfs.block.size(=128)hive.map.aggr=true ,相当于开启Combiner功能
    reduce优化优化并发个数
    set mapred.reduce.tasks=10
    mapreduce出现痛点:只有一个reduce的情况

    没有group by
    order by,建议用distribute by和sort by来替代

    order by:全局排序,因此只有一个reduce,当输入数据规模较大时,计算时间消耗严重sort by:不是全局排序,如果用sort by排序,并且设置多个reduce,每个reduce输出是有序的,但是不保证全局排序distribute by:控制map端的数据如果拆分给redcuce,可控制分区文件的个数cluster by:相当于distribute by和sort by的结合,但是只默认升序排序

    例子1:select * from TableA distribute by userid sort by itemid;
    例子2:select from TableA distribute by itemid sort by itemid asc;select from TableA cluster by itemid;

    笛卡尔积使用join的时候,尽量有效的使用on条件
    mapreduce出现痛点:如何加快查询速度(横向尽可能多并发,纵向尽可能少依赖)
    分区Partition
    Map Join:指定表是小表,内存处理,通常不超过1个G或者50w记录
    union all:先把两张表union all,然后再做join或者group by,可以减少mr的数量union 和 union all区别:union相当记录合并,union all不合并,性能后者更优
    multi-insert & multi group by查一次表完成多个任务
    Automatic merge:为了多个小文件合并
    – hive.merge.mapfiles = true 是否和并 Map 输出文件,默认为 True– hive.merge.mapredfiles = false 是否合并 Reduce 输出文件,默认为 False– hive.merge.size.per.task = 25610001000 合并文件的大小
    Multi-Count Distinct一个MR,拆成多个的目的是为了降低数据倾斜的压力必须设置参数:set hive.groupby.skewindata=true; 强制拆分,万能参数
    并行执行:set hive.exec.parallel=true 独立无依赖的MR

    mapreduce出现痛点:如何加快join操作
    语句优化:多表连接:如果join中多个表的join key是同一个,则join会转化为单个mr任务表的连接顺序:指定大、小表Hive默认把左表数据放到缓存中,右边的表的数据做流数据
    如果避免join过程中出现大量结果,尽可能在on中完成所有条件判断

    mapreduce解决数据倾斜问题
    操作 • Join • Group by • Count Distinct原因 • key分布不均导致的 • 人为的建表疏忽 • 业务数据特点症状 • 任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。• 查看未完成的子任务,可以看到本地读写数据量积累非常大,通常超过10GB可以认定为发生数据倾斜。倾斜度 • 平均记录数超过50w且最大记录数是超过平均记录数的4倍。 • 最长时长比平均时长超过4分钟,且最大时长超过平均时长的2倍。 万能方法 • hive.groupby.skewindata=true
    1、大小表关联
    Small_table join big_table
    2、大大表关联
    userid为0或null等情况,两个表做join

    方法一:业务层面干掉0或null的user
    方法二:sql层面key—tostring—randomon case when (x.uid = ‘-‘ or x.uid = ‘0‘ or x.uid is null) then concat(‘dp_hive_search’,rand()) else x.uid end = f.user_id;
    方法三:业务削减:先查询当天用户当小表做join

    3、聚合时存在大量特殊值
    select cast(count(distinct(user_id))+1 as bigint) as user_cntfrom tab_awhere user_id is not null and user_id <> ‘’
    4、空间换时间
    Select day,count(distinct session_id),count(distinct user_id) from log a group by day
    同一个reduce上进行distinct操作时压力很大
    select day, count(case when type='session' then 1 else null end) as session_cnt, count(case when type='user' then 1 else null end) as user_cnt from ( select day,session_id,type from ( select day,session_id,'session' as type from log union all select day user_id,'user' as type • from log ) group by day,session_id,type ) t1 group by day搭建参考:http://note.youdao.com/noteshare?id=22ac246cfddfe1e3f1a400a01a8f578a\&sub=34A84DA642E245E28E4C7B6172CD69FC
    实践:

    导入数据并统计
    join
    UDF
    从HDFS导入
    insert导入
    查询插入
    导出(local+hdfs)
    partition
    hive+hbase
    2 留言 2019-02-03 11:39:01 奖励15点积分
  • 大数据5-1.spark-core

    前文链接:https://write-bug.com/article/2090.html
    下面开始新的章节:Spark
    MapReduce:分布式批量计算框架——对HDFS的强依赖。Spark-core:基于内存的分布式批量计算引擎/框架

    Spark的运行模式:

    单机模式:方便人工调试程序
    ./bin/run-example SparkPi 10 --master local[2] //一个进程模拟一个机器Standalone模式:自己独立一套集群(master/client/slave) 缺点:资源不利于充分利用
    ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 lib/spark-examples-1.6.0-hadoop2.6.0.jar 100– Master/Slave结构• Master:类似Yarn中的RM• Worker:类似Yarn中的NM
    Yarn模式:

    Yarn-Client模式:Driver(spark Context)运行在本地Client。适合交互调试---本地和页面特定端口。



    Yarn-Cluster模式:Driver运行在集群(AM)。正式提交任务的模式(remote)--  yarn-cluster

    ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/sparkexamples-1.6.0-hadoop2.6.0.jar 10Yarn Cluster vs. Yarn Client区别:本质是AM进程的区别,cluster模式下,driver运行在AM中,负责向Yarn申请资源 ,并监督作业运行状况,当用户提交完作用后,就关掉Client,作业会继续在yarn上运行。然而cluster模式不适合交互 类型的作业。而client模式,AM仅向yarn请求executor,client会和请求的container通信来调度任务,即client不能离开。
    Master/Slave结构

    RM:全局资源管理器,负责系统的资源管理和分配
    NM:每个节点上的资源和任务管理器
    AM:每个应用程序都有一个,负责任务调度和监视,并与RM调度器协商为任务获取资源


    Hadoop1.0中:

    一个MapReduce程序就是一个job,而一个job里面可以有一个或多个Task,Task又可以区分为Map Task和Reduce Task
    MapReduce中的每个Task分别在自己的进程中运行,当该Task运行完时,进程也就结束



    Spark中作业运行原理:

    Application:spark-submit提交的程序 ,一个Spark应用程序由一个Driver端和多个Executor组成,多个task 以多线程的形式运行在Executor中。
    Driver(AM):完成任务的调度以及executor和cluster manager进行协调

    Driver负责运行程序的main()方法,负责将代码发送到各个Executor去执行,然后取回结果,Driver端也是以Executor方式运行,可以运行在本地(Client),也可以运行在集群内部(Cluster),以Spark-shell形式运行的Spark应用程序,其Driver端都是运行在本地,以Spark-submit形式提交的作业可以通过参数- -deploy-mode来指定Driver端运行在哪里。 ———————-对应了集群中AM的功能
    构建SparkContext(Spark应用的入口,创建需要的变量,还包含集群的配置信息等)
    将用户提交的job转换为DAG图(类似数据处理的流程图)
    根据策略将DAG图划分为多个stage,根据分区从而生成一系列tasks
    根据tasks要求向RM申请资源
    提交任务并检测任务状态


    Executor(container): Executor是代码执行的地方,以进程的形式运行在节点之上(如standalone模式的Worker和yarn模式的NM)。每个worker node(NM)中有多个container(executor),是真正执行task的单元
    Task(线程): 一个Executor可以同时运行多个task(这里对应了上面说的一次提交多个任务,以多线程方式执行),每个task执行一个任务,窄依赖(split-》task一对一),其性质是以多线程的形式运行在Executor中。是Spark中最新的执行单元。RDD一般是带有partitions 的,每个partition在一个executor上的执行可以认为是一个 TaskCluster Manager(RM): 根据运行模式的不同(如Standalone,Apache Mesos,Hadoop YARN ,Kubernetes )其性质也会不同,主要是负责获取集群资源的外部服务(如standalone模式的Master和yarn模式的RM)。
    Job: 一个Action的触发即为一个job,一个job 存在多个task,和MR中Job不一样。MR中Job主要是Map或者Reduce Job。而Spark的Job其实很好区别,一个action算子就算一个 Job,比方说count,first等
    Stage:逻辑概念(dag) 分为水平关系(并行)和垂直关系(串),Job中如果有Shuffle(宽依赖,多对多)产生就会分为2个Stage,一个Job 可由多个Stage构成,一个Stage 可由多个task 组成,是spark中独有的。一般而言一个Job会切换成一定数量 的stage。各个stage之间按照顺序执行
    为什么一个application有多个job?比如分别跑用户特征和物品特征两个任务,这里可以用一个spark-submit同时提交两个任务--一个应用,捏合到一块并发执行
    在mr中,只能分别提交执行,串行。
    为什么一个Job有多个Stage?
    有宽依赖的,shuffle的时候是串行的stage (有partition和计算的中间结果),窄依赖的是一对一的是并行的stage。
    这里的并行和上面的两个job并行,像上面的特征任务,可以达到同样的效果,但是数据倾斜的时候就一般会变为两个job
    (万能方法:hive.groupby.skwindata=true)详情:Hive数据倾斜解决方法总结
    为什么一个Stage里有多个task?
    task ---线程
    Spark开发和MR开发
    MR:Map脚本

    Reduce脚本

    Spark:--Scala

    Scala基础参考:http://www.runoob.com/scala/scala-basic-syntax.html
    算子
    算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
    算子分类:

    Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
    Key-Value数据类型的Transfromation算子,这种变换并不触发提交作业,针对处理的数据项是Key-Value型的数据对。
    Action算子,这类算子会触发SparkContext提交Job作业。

    RDD有两种操作算子:

    Transformation(转换)--变换非为两类:窄依赖和宽依赖
    Ation(执行)

    transformation是得到一个新的RDD,方式很多,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD,Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作。action是得到一个值,或者一个结果(直接将RDDcache到内存中),触发Spark作业的运行,真正触发转换算子的计算。
    所有的transformation都是采用的懒策略,如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。

    输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理。
    运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业。 如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
    输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala int型数据)。

    Transformation和Actions操作概况
    Transformation具体内容

    map(func) :返回一个新的分布式数据集,由每个原元素经过func函数转换后组成。---一对一的处理
    filter(func) : 返回一个新的数据集,由经过func函数后返回值为true的原元素组成。把不符合条件的数据过滤掉---过滤器
    flatMap(func) : 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)。---一对多,比如句子变token
    sample(withReplacement, frac, seed) : 根据给定的随机种子seed,随机抽样出数量为frac的数据。---采样
    union(otherDataset) : 返回一个新的数据集,由原数据集和参数联合而成。---合并----unionall(不做去重)
    groupByKey([numTasks]) : 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task。---对某一字段key聚合 user-》(Sequence)item:score item:score item:score
    reduceByKey(func, [numTasks]) : 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。---对不同的主键,value做计算
    join(otherDataset, [numTasks]) : 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集。---两个表做聚合,根据key
    groupWith(otherDataset, [numTasks]) : 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。
    cartesian(otherDataset) : 笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。
    sort
    partitionBy---分桶

    Actions具体内容

    reduce(func) : 通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行。
    collect() : 在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM。----明文输出
    count() : 返回数据集的元素个数。---数据统计
    take(n) : 返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素内存压力会增大,需要谨慎使用。
    first() : 返回数据集的第一个元素(类似于take(1))。
    saveAsTextFile(path) : 将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本。
    saveAsSequenceFile(path) : 将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)。
    foreach(func) : 在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互。

    Spark会产生shuffle的算子---宽依赖

    去重(distinct)聚合(reduceByKey、groupBy、groupByKey、aggregateByKey、combineByKey)
    排序(sortBy、sortByKey)重分区(coalesce、repartition)集合或者表操作(intersection、subtract、subtractByKey、join、leftOuterJoin)

    Spark基于弹性分布式数据集(RDD)模型,具有良好的通用性、容错性与并行处理数据的能力
    RDD( Resilient Distributed Dataset ):弹性分布式数据集(相当于集合),它的本质是数据 集的描述(只读的、可分区的分布式数据集),而不是数据集本身 ----一块内存区域的格式化表达,描述数据前后依赖关系

    RDD使用户能够显式将计算结果保存在内存中,控制数据的划分,并使用更丰富的操作集合来处理
    使用更丰富的操作来处理,只读(由一个RDD变换得到另一个RDD,但是不能对本身的RDD修改)
    记录数据的变换而不是数据本身保证容错(lineage) ---DAG

    通常在不同机器上备份数据或者记录数据更新的方式完成容错,但这种对任务密集型任务代价很高
    RDD采用数据应用变换(map,filter,join),若部分数据丢失,RDD拥有足够的信息得知这部分数据是如何计算得到的,可通过重新计算来得到丢失 的数据
    这种恢复数据方法很快,无需大量数据复制操作,可以认为Spark是基于RDD模型的系统

    懒操作,延迟计算,action的时候才操作
    瞬时性,用时才产生,用完就释放
    Spark允许从以下四个方面构建RDD– 从共享文件系统中获取,如从HDFS中读数据构建RDD• val a = sc.textFile(“/xxx/yyy/file”)– 通过现有RDD转换得到• val b = a.map(x => (x, 1))– 定义一个scala数组• val c = sc.parallelize(1 to 10, 1)– 有一个已经存在的RDD通过持久化操作生成• val d = a.persist(), a. saveAsHadoopFile(“/xxx/yyy/zzz”)

    RDD的依赖关系

    窄依赖:不关心一条数据是从哪里来的,比如flatmap虽然是一到多,但是不关心纪录是哪里来的,只是把它分割为多条数据
    宽依赖:需要对key有一个严格的要求。有partition

    从后往前,将宽依赖 的边删掉,连通分量 及其在原图中所有依赖的RDD,构成一个 stage
    DAG是在计算过程 中 不 断 扩 展 , 在 action后才会启动计算
    每个stage内部尽可 能多地包含一组具有 窄依赖关系的转换, 并将它们流水线并行化(pipeline)


    RDD容错
    中间某一环节出现错误,部分数据失效(框架判断),这部分数据可以找到最近数据源(缓存)从新计算弥补回来
    一般如果要做cache的话,最好在shuffle类的算子后做cache,这类算子为宽依赖算子,数据很不容易得到,需要依赖于上游所有数据都准备数据之后才执行,哪怕有一块数据失效,都不能执行,为了保证下游失效不重新计算,在这里做一个cache。--既有内存又有磁盘

    优化
    每一个进程包含一个executor对象,每个executor对象包含一个线程池,每个线程执行一个tasks
    线程池好处:省去了频繁启停进程的开销
    Task并发度概念

    每一个节点可以启动一个或多个executor进程
    每个进程由多个core(相当于虚拟的cpu(通过配置配成但不能超过真实机核数))组成,每个core一次只能执行一个task
    调参:

    master yarn-cluster
    num-executors 100
    executor-memory 6G
    executor-cores 4
    driver-memory 1G

    内存划分

    20%:execution:执行内存---shuffle类算子会消耗内存,有的还需要cache缓存,如果内存满溢,把数据写到磁盘上(split)
    60%:Storage:存储cache、presist、broadcast数据(分发小数据集相当于-file---比如在本地完成一个mapside-join去代替处理大数据集的框架(小数据集没有必要使用,完全可以放在内存中实现))

    broadcast:分发小程序数据—mapsite-join

    适合做黑名单,白名单,字典。。。比如分发一个itemid-》itemname就可以在本地数据集里做一个id-》name 的转换
    20%:程序运行
    调参:

    conf spark.default.parallelism=1000每个stage默认的task数量
    conf spark.storage.memory Fraction=0.5默认0.6
    conf spark.shuffle.memory Fraction=0.3默认0.2

    剩下的百分比就是程序运行的内存

    spark1.6版本之前:每个类的内存是相互隔离的,导致了executor的内存利用率不高,需要人工调配参数百分比来优化内存
    spark1.6版本之后(包含):execution和storage内存是可以相互借用的(不够用时动态调整),减少了OOM的情况发生,但也需要参数值作为它的参考。---总内存空间够用的话就不用落地到磁盘了。

    开发调优

    原则一:避免创建重复的RDD
    原则二:尽可能复用同一个RDD
    原则三:对多次使用的RDD进行持久化处理
    原则四:避免使用shuffle类算子
    原则五:使用map-side预聚合的shuffle操作

    一定要使用shuffle的,无法用map类算子替代的,那么尽量使用map-site预聚合的算子
    思想类似MapReduce中的Combiner
    可能的情况下使用reduceByKey或aggregateByKey算子替代groupByKey算子,因为 reduceByKey或aggregateByKey算子会使用用户自定义的函数对每个节点本地相同的key进行 预聚合,而groupByKey算子不会预聚合

    原则六:使用Kryo优化序列化性能

    Kryo是一个序列化类库,来优化序列化和反序列化性能
    Spark默认使用Java序列化机制(ObjectOutputStream/ ObjectInputStream API)进行序列 化和反序列化
    Spark支持使用Kryo序列化库,性能比Java序列化库高很多,10倍左右


    Spark技术栈Spark Core : 基于RDD提供操作接口,利用 DAG进行统一的任务规划
    Spark SQL : Hive的表 + Spark的里。通过把 Hive的HQL转化为Spark DAG计算来实现
    Spark Streaming : Spark的流式计算框架
    MLIB : Spark的机器学习库,包含常用的机器 学习算法
    GraphX : Spark图并行操作库
    Spark搭建参考:http://note.youdao.com/noteshare?id=e8c9e725586227986f7abaeb7bada719&sub=74FBCEE204FE48F29C0A17370010B37C
    实践
    Scala-word countScala-UIgroupScala-cf—协同过滤pyspark-wordcountpyspark-jieba_word_clusterpyspark-LR+NB
    2 留言 2019-01-30 17:17:54 奖励22点积分
  • 大数据4.Yarn

    前文链接:https://write-bug.com/article/2076.html
    在Haoop2.0中引入了Yarn这个分布式操作系统:
    在1.0中的 集群利用率过低,MR相当于灵魂的存在,想跑MR任务在一个HDFS上,想跑Spark.Storm又要创建个集群才行,不能共享资源,这时候我们想在一个集群中跑不同的计算框架任务这种需求,在1.0中就搞不定了
    1.0 组件相互独立,任务独立,整个的数据处理基本为MR包揽。
    2.0所有任务跑在一个集群,在这里,这些组件就相当于应用程序运行在其上,不同的计算框架任务都变成了一个很普通的插件了,这样就可以有效的利用集群资源了,yarn就相当于一个集群操作系统。

    作用:资源整合
    目的:系统资源利用最大化,在同一系统上运行不同的数据处理框架任务

    下面介绍一下Yarn的核心服务:

    ResouceManger—>RM资源管理器—->原来的Job Tracker
    Job—>application——-任务


    YARN负责管理集群上的各种资源,它也是master/slave结构:ResourceManager为master,NodeManager为slave,RM负责对NM上的资源进行统一管理和调度。当用户提交一个应用程序时(可以是MR程序、Spark程序或Storm程序等),会启动一个对应的ApplicationMaster用于跟踪和管理这个程序。它负责向RM申请资源,并在NM上启动应用程序的子任务。
    YARN的基本组成结构(组件)如下:
    1)ResourceManager——集群资源的仲裁者,1.0中Job Tracker的部分功能
    RM是一个全局的资源管理器,负责整个YARN集群上的资源管理和分配。它由如下两个组件构成:

    Scheduler(调度器):调度器根据各个应用的资源需求进行资源分配。资源分配单位用一个抽象概念“资源容器”(Resource Container,简称Container)表示。—-可插拔的调度组件(有很多配置选项)。—不负责应用程序的监控和状态跟踪,也不保证应用程序失败或者硬件失败的情况下对Task的重启

    FIFO Scheduler 按提交顺序,大任务阻塞
    Capacity Scheduler 专有队列运转小任务,但是小任务预先占一定集群资源
    Fair Scheduler 动态调整,公平共享

    Applications Manager(应用程序管理器):应用程序管理器负责管理整个系统中的所有应用程序,如启动应用程序对应的ApplicaitonMaster、监控AM运行状态并在失败时重启它。

    2)ApplicationMaster
    当客户端提交一个应用程序至YARN集群上时,启动一个对应的AM用于跟踪和管理这个程序。AM的主要功能包括:

    向RM调度器请求资源
    用户级别的作业进程,负责整个作业的运转周期
    监控所有任务的运行状态,并在任务运行失败时重新为任务申请资源以重启任务
    本质是一个特殊的container(Boss也是人嘛)——-随时关闭和启动,不需要一直启动

    注:AM是YARN对外提供的一个接口,不同的计算框架提供该接口的实现,如MRAppMaster、SparkAppMaster等等,使得该类型的应用程序可以运行在YARN集群上。
    3)NodeManager
    NM是每个节点上的资源和任务管理器。NM的主要功能包括:

    周期性向RM汇报(心跳)本(单)节点上的资源使用情况和Container的运行状态
    接收并处理来自AM的任务启动/停止等各种请求
    管理和启动container
    RM相当于山高皇帝远啊,通过NM(管理本地)就可以管辖整个集群

    4)Container——-资源—类似1.0的slot—-上面跑着真正的任务
    Container是YARN中的资源分配单位,它将内存、CPU、磁盘、网络等(任务运行环境)资源封装在一起。当AM向RM申请资源时,RM为AM返回的资源便是用Container的形式运转AM的应用。运行中的Container可能是一个Map,也可能是一个Reducer,Container也得和AM通信,报告任务的状态和健康信息。(任务级别)

    例子:RM是一个租房的中介老板,container是房源,如果来一个老板租房做美容院,那么不管他的分店还是办公地点都是不同的资源,办公楼管理着分店,那么办公楼就是个AM,管理着每个店面的销售额(任务调度),每一个地区都有一个销售经理,对接着所管辖区域的销售工作(但是NM同时一直监控管辖区域的房子完整和任务工作(自己店面资源)),这个销售经理就是个NM,然后美容店就可以开业运转了。

    1.0中JobTracker 是个独裁分子(资源管理、作业调度及监控),2.0Yarn中把其权力分权:RM、AM
    一个任务如果需要多个Container,则通过最小容量单位的多个Container来组成(为甚么yarn中存在虚拟Cpu的概念:根据具体硬件配置的不同,但也需根据实际情况)
    Yarn中优化:当MR执行结束,可以释放map的Container,请求更多的Reducer的Container
    不管是什么职位,大家工位都一样了,对于提交给集群的每个作业,会启动一个专用的、短暂的 JobTracker 来控制该作业中的任 务的执行,短暂的 JobTracker 由在从属节点上运行的 TaskTracker 启动不管AM(HR)在那个工位,他自己管理的团队和项目都能正常运转。
    Yarn优势

    减小了 JobTracker(也就是现在的 RM)的资源消耗,并且让监测每一个 Job 子任务 (tasks) 状态的程序分布式化了,更安全、更优美
    AM是一个可变更的部分,用户可以对不同的编程模型写自己的 AM,让更多类 型的编程模型(更多计算框架,不只是MR)能够跑在 Hadoop 集群中
    对于资源的表示以内存为单位,比之前以剩余 slot 数目更合理
    老的框架中,JobTracker 一个很大的负担就是监控 job 下的 tasks 的运行状况 ,现在,这个部分就扔给 ApplicationMaster 做了
    资源表示成内存量,那就没有了之前的 map slot/reduce slot 分开造成集群资 源闲置的尴尬情况(map释放后可以用于reduce)

    容错能力:RM挂掉:HA—-zookeeper
    NM挂掉:心跳告知RM,RM通知AM,AM进一步处理
    AM挂掉:RM负责重启,RM上有一个RMAM,是AM的AM,保存了已完成的信息,不再运行

    HDFS对Map Reduce彻底的重构—>MRv2或者Yarn
    MRv1: 不适合场景:实时(storm)、迭代(机器学习,模型训练)、图计算场景(Giraph)
    MRv2执行流程
    1、作业提交阶段
    作业提交阶段主要在客户端完成,过程如下图:

    当作业提交成功后,在HDFS上可以查看到相关作业文件:

    这些文件都是在Client端生成并上传至HDFS,对这些作业文件的说明如下:

    作业jar包:job.jar
    作业输入分片信息:job.split、job.splitmetainfo
    作业配置信息:job.xml

    这些作业文件存放在/tmp/hadoop-yarn/staging/hadoop/.staging/job_1435030889365_0001路径下,该路径又称为作业提交路径submitJobDir,它包括如下两部分:

    jobStagingArea:/tmp/hadoop-yarn/staging/hadoop/.staging是作业的staging目录
    jobID:唯一标识集群上的一个MR 任务

    2、作业的初始化阶段

    对上图更详细的表述如下:

    步骤1:当MR作业提交至YARN后,RM为该作业分配第一个Container,并与对应的NM通信,在Container中启动作业的MRAppMaster。
    步骤2:MRAppMaster向RM注册自己。这使得客户端可以直接通过RM查看应用程序的运行状态。
    步骤3:MRAppMaster读取作业的输入分片。作业的分片数(split)决定了启动的map任务数。

    3、作业执行阶段

    对上图更详细的表述如下:

    步骤1:MRAppMaster采用轮询的方式向RM申请任务所需资源。
    步骤2:资源分配成功后,MRAppMaster就与对应的NM通信,并启动YarnChild来执行具体的Map任务和Reduce任务。在任务运行前,还要将任务所需文件本地化(包括作业Jar包、配置信息等)。

    注:每个输入分片对应一个Map任务;Reduce任务数由mapreduce.job.reduces属性确定。

    步骤3:在YARN上运行时,任务每3秒钟向AM汇报进度和状态。
    步骤4:应用程序运行完成后,AM向RM注销自己,并释放所有资源。

    至此,MR作业执行结束。下图附上《Hadoop权威指南》中关于YARN上运行MR作业的流程图

    说明:上述步骤并没有考虑Shuffle过程,这是为了方便表述。
    1 留言 2019-01-30 17:03:55 奖励18点积分
  • 大数据3-2 HDFS2.0

    前文链接:https://write-bug.com/article/2058.html
    这节我们来介绍下HDFS2.0是如何一步步去解决1.0中存在的问题的,并在此基础上介绍2.0存储原理机制:
    首先我们先列出几个2.0的新特性,由这个为主线:

    NameNode HA 数据高可用
    NameNode Federation 联邦
    HDFS 快照
    HDFS 缓存
    HDFS ACL


    在Hadoop1.0整个集群只有一个NN,存在单点故障问题,一旦NN挂掉,整个集群也就无法使用了,而在1.0中的解决方法不管是NFS还是SNN都不是一个很好的解决办法,这个上节已经说了,所以HDFS2.0首当其冲的就是这个单点故障问题,引入了Name Node High Available(NNHA)高可用,不同于1.0中的不可靠实现(SNN),NNHA中有两个Name Node(分别为:ActiveName Node和StandByName Node),顾名思义,一个为工作状态一个为待命状态,当ANN所在服务器宕机时,SNN可以在数据不丢失的情况下,手工或自动切换并提供服务。
    ANN在集群中和原来的NN一样,承担着所有与客户端的交互操作,而SBNN在集群中不需要做任何交互操作,那他做什么呢?它只需要与ANN做好通信工作,什么意思,就是ANN数据发生变化时,SBNN和它做一个数据同步工作,这样就算我的ANN挂掉,SBNN可以随时顶上去成为新的ANN,提供快速的故障恢复。
    所以这里就引出一个问题:
    如何保持集群中两个NN的数据同步?HDFS2.0完整架构图:

    我们先来看下上面架构这个图,经过我们前面1.0的讲解,虽然这幅图确实发生了很大变化,但是我们先看下面一部分和前面还是很相似的,在1.0中我的NN和DN是通过心跳机制进行通信,DN去汇报block的健康情况、更新情况和任务进度,这里也不例外,只是DN同时向两个NN发送心跳罢了,目的就是保证两个NN维护的数据保持一致。而这里同步的数据是block->datanode的映射信息,每次心跳自动上报到NN中,所以NN中不必存储此类元数据。但是只靠心跳机制去维护还不够,因为还有内存中的path->blocklist去如何维护呢?两种方法:

    NFS(network file system):运维中的网络远程挂载目录,可以保证两台不同机器可以共享访问同一网络文件系统。(数据保存在了公共节点机器上)

    缺点:定制化硬件设备:必须是支持NAS的设备才能满足需求。复杂化部署过程:在部署好NameNode后,还必须额外配置NFS挂载、定制隔离脚本,部署易出错。简陋化NFS客户端:Bug多,部署配置易出错,导致HA不可用
    属于操作系统支持的配置——-fsimage镜像文件和edits编辑日志文件。

    QJM(基于Paxos(基于消息传递的一致性算法))最低法定人数管理机制:Hadoop自身提供的服务,借助了zookeeper,相当于把公共数据写在了zookeeper上,从而去维护两个NN上的log文件。

    属于应用软件级别的配置——用2N+1台机器存储editlog

    NameNode 为了数据同步,会通过一组称作Journal Nodes的独立进程进行相互通信。当active状态的Name Node命名空间有任何修改时,会告知大部分的JN进程,standby状态的Name Node再去读取JNs中的变更信息,并且一直监控editlog 中的变化,把变化应用到自己的命名空间,这样就可以确保集群出错时,命名空间状态已经是同步的了。
    Hadoop的元数据主要作用是维护HDFS文件系统中文件和目录相关信息。元数据的存储形式主要有3类:内存镜像、磁盘镜像(FSImage)、日志(EditLog)。在Namenode启动时,会加载磁盘镜像到内存中以进行元数据的管理,存储在NameNode内存;磁盘镜像是某一时刻HDFS的元数据信息的快照,包含所有相关Datanode节点文件块映射关系和命名空间(Namespace)信息,存储在NameNode本地文件系统;日志文件记录client发起的每一次操作信息,即保存所有对文件系统的修改操作,用于定期和磁盘镜像合并成最新镜像,保证NameNode元数据信息的完整,存储在NameNode本地和共享存储系统(QJM)中。
    如下所示为NameNode本地的EditLog和FSImage文件格式,EditLog文件有两种状态: inprocess和finalized, inprocess表示正在写的日志文件,文件名形式:editsinprocess[start-txid] finalized表示已经写完的日志文件,文件名形式:edits[start-txid][end-txid]; FSImage文件也有两种状态, finalized和checkpoint, finalized表示已经持久化磁盘的文件,文件名形式: fsimage[end-txid], checkpoint表示合并中的fsimage, 2.x版本checkpoint过程在Standby Namenode(SNN)上进行,SNN会定期将本地FSImage和从QJM上拉回的ANN的EditLog进行合并,合并完后再通过RPC传回ANN。
    name/├── current│ ├── VERSION│ ├── edits_0000000003619794209-0000000003619813881│ ├── edits_0000000003619813882-0000000003619831665│ ├── edits_0000000003619831666-0000000003619852153│ ├── edits_0000000003619852154-0000000003619871027│ ├── edits_0000000003619871028-0000000003619880765│ ├── edits_0000000003619880766-0000000003620060869│ ├── edits_inprogress_0000000003620060870│ ├── fsimage_0000000003618370058│ ├── fsimage_0000000003618370058.md5│ ├── fsimage_0000000003620060869│ ├── fsimage_0000000003620060869.md5│ └── seen_txid└── in_use.lock
    上面所示的还有一个很重要的文件就是seen_txid,保存的是一个事务ID,这个事务ID是EditLog最新的一个结束事务id,当NameNode重启时,会顺序遍历从edits_0000000000000000001到seen_txid所记录的txid所在的日志文件,进行元数据恢复,如果该文件丢失或记录的事务ID有问题,会造成数据块信息的丢失。
    HA其本质上就是要保证主备NN元数据是保持一致的,即保证fsimage和editlog在备NN上也是完整的。元数据的同步很大程度取决于EditLog的同步,而这步骤的关键就是共享文件系统,下面开始介绍一下关于QJM共享存储机制。
    QJM原理QJM全称是Quorum Journal Manager, 由JournalNode(JN)组成,一般是奇数点结点组成。每个JournalNode对外有一个简易的RPC接口,以供NameNode读写EditLog到JN本地磁盘。当写EditLog时,NameNode会同时向所有JournalNode并行写文件,只要有N/2+1结点写成功则认为此次写操作成功,遵循Paxos协议。其内部实现框架如下:

    从图中可看出,主要是涉及EditLog的不同管理对象和输出流对象,每种对象发挥着各自不同作用:
    FSEditLog:所有EditLog操作的入口JournalSet: 集成本地磁盘和JournalNode集群上EditLog的相关操作FileJournalManager: 实现本地磁盘上 EditLog 操作QuorumJournalManager: 实现JournalNode 集群EditLog操作AsyncLoggerSet: 实现JournalNode 集群 EditLog 的写操作集合AsyncLogger:发起RPC请求到JN,执行具体的日志同步功能JournalNodeRpcServer:运行在 JournalNode 节点进程中的 RPC 服务,接收 NameNode 端的 AsyncLogger 的 RPC 请求。JournalNodeHttpServer:运行在 JournalNode 节点进程中的 Http 服务,用于接收处于 Standby 状态的 NameNode 和其它 JournalNode 的同步 EditLog 文件流的请求。
    下面具体分析下QJM的读写过程。
    QJM 写过程分析
    上面提到EditLog,NameNode会把EditLog同时写到本地和JournalNode。写本地由配置中参数dfs.namenode.name.dir控制,写JN由参数dfs.namenode.shared.edits.dir控制,在写EditLog时会由两个不同的输出流来控制日志的写过程,分别为:EditLogFileOutputStream(本地输出流)和QuorumOutputStream(JN输出流)。写EditLog也不是直接写到磁盘中,为保证高吞吐,NameNode会分别为EditLogFileOutputStream和QuorumOutputStream定义两个同等大小的Buffer,大小大概是512KB,一个写Buffer(buffCurrent),一个同步Buffer(buffReady),这样可以一边写一边同步,所以EditLog是一个异步写过程,同时也是一个批量同步的过程,避免每写一笔就同步一次日志。
    这个是怎么实现边写边同步的呢,这中间其实是有一个缓冲区交换的过程,即bufferCurrent和buffReady在达到条件时会触发交换,如bufferCurrent在达到阈值同时bufferReady的数据又同步完时,bufferReady数据会清空,同时会将bufferCurrent指针指向bufferReady以满足继续写,另外会将bufferReady指针指向bufferCurrent以提供继续同步EditLog。上面过程用流程图就是表示如下:

    这里有一个问题,既然EditLog是异步写的,怎么保证缓存中的数据不丢呢,其实这里虽然是异步,但实际所有日志都需要通过logSync同步成功后才会给client返回成功码,假设某一时刻NameNode不可用了,其内存中的数据其实是未同步成功的,所以client会认为这部分数据未写成功。
    第二个问题是,EditLog怎么在多个JN上保持一致的呢。下面展开介绍。
    日志同步
    这个步骤上面有介绍到关于日志从ANN同步到JN的过程,具体如下:

    执行logSync过程,将ANN上的日志数据放到缓存队列中
    将缓存中数据同步到JN,JN有相应线程来处理logEdits请求
    JN收到数据后,先确认EpochNumber是否合法,再验证日志事务ID是否正常,将日志刷到磁盘,返回ANN成功码
    ANN收到JN成功请求后返回client写成功标识,若失败则抛出异常

    隔离双写:
    在ANN每次同步EditLog到JN时,先要保证不会有两个NN同时向JN同步日志。这个隔离是怎么做的。这里面涉及一个很重要的概念Epoch Numbers,很多分布式系统都会用到。Epoch有如下几个特性:
    当NN成为活动结点时,其会被赋予一个EpochNumber每个EpochNumber是惟一的,不会有相同的EpochNumber出现EpochNumber有严格顺序保证,每次NN切换后其EpochNumber都会自增1,后面生成的EpochNumber都会大于前面的EpochNumber
    QJM是怎么保证上面特性的呢,主要有以下几点:
    第一步,在对EditLog作任何修改前,QuorumJournalManager(NameNode上)必须被赋予一个EpochNumber第二步, QJM把自己的EpochNumber通过newEpoch(N)的方式发送给所有JN结点第三步, 当JN收到newEpoch请求后,会把QJM的EpochNumber保存到一个lastPromisedEpoch变量中并持久化到本地磁盘第四步, ANN同步日志到JN的任何RPC请求(如logEdits(),startLogSegment()等),都必须包含ANN的EpochNumber第五步,JN在收到RPC请求后,会将之与lastPromisedEpoch对比,如果请求的EpochNumber小于lastPromisedEpoch,将会拒绝同步请求,反之,会接受同步请求并将请求的EpochNumber保存在lastPromisedEpoch
    这样就能保证主备NN发生切换时,就算同时向JN同步日志,也能保证日志不会写乱,因为发生切换后,原ANN的EpochNumber肯定是小于新ANN的EpochNumber,所以原ANN向JN的发起的所有同步请求都会拒绝,实现隔离功能,防止了脑裂。
    NN切换时 恢复in-process日志
    为什么要这步呢,如果NN切换了,可能各个JN上的EditLog的长度都不一样,需要在开始写之前将不一致的部分恢复。恢复机制如下:

    ANN先向所有JN发送getJournalState请求;
    JN会向ANN返回一个Epoch(lastPromisedEpoch);
    ANN收到大多数JN的Epoch后,选择最大的一个并加1作为当前新的Epoch,然后向JN发送新的newEpoch请求,把新的Epoch下发给JN;
    JN收到新的Epoch后,和lastPromisedEpoch对比,若更大则更新到本地并返回给ANN自己本地一个最新EditLogSegment起始事务Id,若小则返回NN错误;
    ANN收到多数JN成功响应后认为Epoch生成成功,开始准备日志恢复;
    ANN会选择一个最大的EditLogSegment事务ID作为恢复依据,然后向JN发送prepareRecovery; RPC请求,对应Paxos协议2p阶段的Phase1a,若多数JN响应prepareRecovery成功,则可认为Phase1a阶段成功;
    ANN选择进行同步的数据源,向JN发送acceptRecovery RPC请求,并将数据源作为参数传给JN。
    JN收到acceptRecovery请求后,会从JournalNodeHttpServer下载EditLogSegment并替换到本地保存的EditLogSegment,对应Paxos协议2p阶段的Phase1b,完成后返回ANN请求成功状态。
    ANN收到多数JN的响应成功请求后,向JN发送finalizeLogSegment请求,表示数据恢复完成,这样之后所有JN上的日志就能保持一致。

    数据恢复后,ANN上会将本地处于in-process状态的日志更改为finalized状态的日志,形式如edits[start-txid][stop-txid]。
    通过上面一些步骤,日志能保证成功同步到JN,同时保证JN日志的一致性,进而备NN上同步日志时也能保证数据是完整和一致的。
    QJM读过程分析
    这个读过程是面向备NN(SNN)的,SNN定期检查JournalNode上EditLog的变化,然后将EditLog拉回本地。SNN上有一个线程StandbyCheckpointer(可以称为正在合并中的fsimage),会定期将SNN上FSImage和EditLog合并,并将合并完的FSImage文件传回主NN(ANN)上,就是所说的Checkpointing过程。下面我们来看下Checkpointing是怎么进行的。
    在2.x版本中,已经将原来的由SecondaryNameNode主导的Checkpointing替换成由SNN主导的Checkpointing。下面是一个CheckPoint的流向图:

    总的来说,就是在SNN上先检查前置条件,前置条件包括两个方面:距离上次Checkpointing的时间间隔和EditLog中事务条数限制。前置条件任何一个满足都会触发Checkpointing,然后SNN会将最新的NameSpace数据即SNN内存中当前状态的元数据保存到一个临时的fsimage文件( fsimage.ckpt)然后比对从JN上拉到的最新EditLog的事务ID,将fsimage.ckpt中没有,EditLog中有的所有元数据修改记录合并一起并重命名成新的fsimage文件,同时生成一个md5文件。将最新的fsimage再通过HTTP请求传回ANN。通过定期合并fsimage有什么好处呢,主要有以下几个方面:
    可以避免EditLog越来越大,合并成新fsimage后可以将老的EditLog删除可以避免主NN(ANN)压力过大,合并是在SNN上进行的可以保证fsimage保存的是一份最新的元数据,故障恢复时避免数据丢失
    主备切换机制要完成HA,除了元数据同步外,还得有一个完备的主备切换机制,Hadoop的主备选举依赖于ZooKeeper。下面是主备切换的状态图:

    从图中可以看出,整个切换过程是由ZKFC来控制的,具体又可分为HealthMonitor、ZKFailoverController和ActiveStandbyElector三个组件。
    ZKFailoverController: 是HealthMontior和ActiveStandbyElector的母体,执行具体的切换操作
    HealthMonitor: 监控NameNode健康状态,若状态异常会触发回调ZKFailoverController进行自动主备切换
    ActiveStandbyElector: 通知ZK执行主备选举,若ZK完成变更,会回调ZKFailoverController相应方法进行主备状态切换
    在故障切换期间,ZooKeeper主要是发挥什么作用呢,有以下几点:
    失败保护:集群中每一个NameNode都会在ZooKeeper维护一个持久的session,机器一旦挂掉,session就会过期,故障迁移就会触发
    Active NameNode选择:ZooKeeper有一个选择ActiveNN的机制,一旦现有的ANN宕机,其他NameNode可以向ZooKeeper申请排他成为下一个Active节点
    防脑裂: ZK本身是强一致和高可用的,可以用它来保证同一时刻只有一个活动节点
    那在哪些场景会触发自动切换呢,从HDFS-2185中归纳了以下几个场景:
    ActiveNN JVM奔溃:ANN上HealthMonitor状态上报会有连接超时异常,HealthMonitor会触发状态迁移至SERVICE_NOT_RESPONDING, 然后ANN上的ZKFC会退出选举,SNN上的ZKFC会获得Active Lock, 作相应隔离后成为Active结点。
    ActiveNN JVM冻结:这个是JVM没奔溃,但也无法响应,同奔溃一样,会触发自动切换。
    ActiveNN 机器宕机:此时ActiveStandbyElector会失去同ZK的心跳,会话超时,SNN上的ZKFC会通知ZK删除ANN的活动锁,作相应隔离后完成主备切换。
    ActiveNN 健康状态异常: 此时HealthMonitor会收到一个HealthCheckFailedException,并触发自动切换。
    Active ZKFC奔溃:虽然ZKFC是一个独立的进程,但因设计简单也容易出问题,一旦ZKFC进程挂掉,虽然此时NameNode是OK的,但系统也认为需要切换,此时SNN会发一个请求到ANN要求ANN放弃主结点位置,ANN收到请求后,会触发完成自动切换。
    ZooKeeper奔溃:如果ZK奔溃了,主备NN上的ZKFC都会感知断连,此时主备NN会进入一个NeutralMode模式,同时不改变主备NN的状态,继续发挥作用,只不过此时,如果ANN也故障了,那集群无法发挥Failover, 也就不可用了,所以对于此种场景,ZK一般是不允许挂掉到多台,至少要有N/2+1台保持服务才算是安全的。
    总结
    上面介绍了下关于HadoopHA机制,归纳起来主要是两块:元数据同步和主备选举。元数据同步依赖于QJM共享存储,主备选举依赖于ZKFC和Zookeeper。整个过程还是比较复杂的,如果能理解Paxos协议,那也能更好的理解这个。
    以上即是利用QJM和zookeeper实现HDFS的高可用,解决了单点故障问题。
    下面再介绍下另一特性:联邦Namenode Federation:—-解决了内存限制问题
    该特性允许一个HDFS集群中存在多个NN同时对外提供服务,这些NN分管一部分目录(水平切分),彼此之间相互隔离,但是本质还是共享了底层的DataNode资源,只不过上层分权了而已。

    架构设计:
    为了水平拓展Name Node,Federation使用了多个独立的Namenode/NameSpace。这些Name Node之间是联合的,也就是说他们之间相互独立且不需要相互协调,各自分工,管理自己的区域。分布式的datanode被用作通用的共享数据块存储设备,每个Datanode要向所有的NN注册,并周期性向所有NN发送心跳和块报告,并执行来自所有namenode的命令。
    这里引入了一个概念叫Block Pool块池,即单个命名空间的一组Block块(一个NN下维护多个DN),每一个命名空间可以根据大小和业务单独分配一个NN。
    DataNode是一个物理概念,Block Pool是一个重新将Block划分的逻辑概念,同一个dataNode中可以存在多个BlockPool的块,BlockPool允许一个命名空间NN在不通知其他NN的时候为一个新的Block创建Blockid,而一个NN失效同样也不会影响其下的DataNode为其他NN服务。
    每个BlockPool内部自治,不与其他Blockpool交流,一个NN挂了不影响其他NN。
    当DN和NN建立联系并开始会话后,自动建立BlockPool,每个Block都有一个唯一的表示,我们称之为拓展块ID,在HDFS集群中是唯一的,为集群归并创造了条件———blockid
    DN中的数据结构通过块池ID Block Pool id索引,即DN中的BlockMap,storage等都通过BPID索引。
    某个NN上的NameSpace和它对应的Block Pool一起被称为NameSpace Volume。它是管理的基本单位。当一个NN/NS被删除后,其所有DN上对应的Block Pool也会被删除。当集群升级时,每个NameSpace Volume作为一个基本单元进行升级。
    集群中提供多个NameNode,每个NameNode负责管理一部分DataNode:
    以上其实就可以理解为不同目录给不同部门管理,对不同命名空间做一个逻辑切分而已,其实下方存储还是没有变。
    如果一个目录比较大,建议用单独的NN维护。
    联邦本质:NN(元数据)与DN(真实数据)解耦,实际为共享数据
    HDFS快照快照snapshots是HDFS文件系统的只读的基于某时间点的拷贝,可以针对某个目录,或者整个文件系统做快照。快照比较常见的应用场景是数据备份,以防一些用户错误或灾难恢复。
    备份、灾备、快速恢复
    本质:也占空间,(但仅仅记录了block列表和大小而已,不涉及数据本身),即某个目录某一时刻的镜像,创建过程快O(1)
    快照的高效性实现:

    快照可以即时创建,耗时仅为O(1)。—excluding the inode lookup time
    只有当涉及到快照目录的修改被执行时,才会产生额外的内存消耗。而且内存消耗为O(M),其中M是被修改的文件或目录数。
    创建快照时,block块并不会被拷贝。快照文件中只记录了block列表和文件大小,不会做任何数据拷贝。
    快照不会对正常的HDFS操作有任何影响:创建快照以后发生的修改操作,被按操作时间的倒序(from newer to older)记录下来。所以当前的数据能被直接获取,而快照点的数据,则通过在当前的数据基础上减去执行过的操作来获取。

    • Snapshot 并不会影响HDFS 的正常操作:修改会按照时间的反序记录,这样可 以直接读取到最新的数据。• 快照数据是当前数据减去修改的部分计算出来的。• 快照会存储在snapshottable的目录下。• HDFS快照是对目录进行设定,是某个目录的某一个时刻的镜像• 对于一个snapshottable文件夹,“.snapshot” 被用于进入他的快照 /foo 是一个 snapshottable目录,/foo/bar是一个/foo下面的文件目录,/foo有一个快照s0,那么路径就是 :/foo/.snapshot/s0/bar
    • hdfs dfsadmin -allowSnapshot /user/spark • hdfs dfs -createSnapshot /user/spark s0 • hdfs dfs -renameSnapshot /user/spark s0 s_init • hdfs dfs -deleteSnapshot /user/spark s_init • hdfs dfsadmin -disallowSnapshot /user/spark注意:做过快照的目录,本身的父目录和子目录都不允许再次做快照。
    HDFS缓存
    1.0:user-》DN 经常读的数据提前加载到内存中
    2.0:集中式缓存—可指定要缓存的HDFS路径——-非递归,只缓存当前目录,缓存到当前机器本地内存中,DN Block-》DN内存

    HDFS中的集中缓存管理(Centralized cache management)是一种显式缓存机制,允许用户指定HDFS需要缓存的路径。NameNode将与磁盘上具有所需要的块的DataNodes进行通信,指示这些DataNodes将这些块缓存到 off-heap caches中。
    HDFS权限控制ACLHadoop中的ACL与Linux中的ACL机制基本相同,都是用于为文件系统提供更精细化的权限控制。首先参数上要开启基本权限和访问控制列表功能– dfs.permissions.enabled– dfs.namenode.acls.enabled
    基本命令操作:
    hdfs dfs -getfacl [-R] 获取目录和文件的ACL 信息– hadoop fs -getfacl /input/aclhdfs dfs -setfacl [-R] [-b |-k -m |-x ] |[--set ] 设置文件和目录的ACL信息– hdfs dfs -setfacl -m user:mapred:r-- /input/acl– hdfs dfs -setfacl -x user:mapred /input/aclhdfs dfs -ls 当ls的权限位输出以+结束时,那么该文件或目录正在启用一个ACL。请参考:Hdfs的ACL測试:https://www.cnblogs.com/mthoutai/p/6868846.html

    搭建这样的机制已在MR章节更新,这里再提醒下学习时可为体验:参考文章:https://www.cnblogs.com/selinux/p/4155814.html【192.168.87.150】master1:NN ZKFC RM【192.168.87.151】master2:NN ZKFC RM【192.168.87.155】slave1:DN NM ZK JN【192.168.87.156】slave2:DN NM ZK JN【192.168.87.157】slave3:DN NM【192.168.87.158】slave4:DN NM ZK JN
    这个章节没怎么写好,还在持续更新。
    2 留言 2019-01-26 19:11:09 奖励23点积分
  • 大数据3-1HDFS1.0

    前文链接:https://write-bug.com/article/2054.html
    前面的章节曾多次提到HDFS,而HDFS作为生态中一个不可或缺的角色足可说明其重要性。
    下面我们首先来介绍一下HDFS1.0:

    好我们进入no pic you say a j8 环节:那么这张图是什么意思呢?
    如图所示主要有三个角色:NameNode、DataNode、Client
    也就是主节点,从节点和客户端。简单地说,主节点主要存储着数据元信息,从节点存储着真实数据,client则是提交任务的客户端机器,其实本质就是向NN请求读写数据(在真正公司中是不会让你真正登陆到任何节点上的,只能是起另外一台机器去和master的地址通过配置文件起某种联系。),我们模拟是在master上提交任务的,后面会详细讲解这几个角色。
    HDFS 是一个主/从(Master/Slave)体系架构,由于分布式存储的性质,集群拥有两类节点 NameNode 和 DataNode。
    HDFS 由三个组件(本质是进程)构成:NameNode(NN)DataNode(DN)SecondaryNameNode(SNN)

    NameNode是主节点,也叫Master,一个hadoop集群只有一个NameNode
    DataNode是从节点,也叫Worker,一个hadoop集群可以有多个DataNode,一个节点只有一个Data Node
    SecondaryNameNode不是第二主节点,是助手节点,作用于数据持久化,你可以认为是一个镜像文件,备份。

    相信上面这几点细节,我们在前面搭建1.0集群时查看jps进程已经体会到了。
    client读取数据的时候,并不是从我们的namenode上读取数据,而是直接从datanode上读取,那他是怎么知道我要的数据在哪台datanode节点上呢?其实是NN告诉客户端的,我们后面会说这个问题。
    那么这个图下面的小方块是什么意思呢?
    HDFS数据的最小单位是block,一个Block默认大小是64M(可配置),这个可以直接影响到MapReduce并发个数,每个绿色相当于一个Block,如果有一个DataNode挂了,数据丢失,打破了3副本的平衡,由3变成2的时候,在配置文件里配3的目的是数据高可用性,所以要复制(Replication)一个副本,这样就保证了一个balance。
    client往NameNode上提交任务时,不是说client有请求时,我的NN和DN才进行交互,而是你来了一个任务,我就去查看我的DN有哪些是空闲的,再继续往空闲的上分配任务,那么NN是如何知道哪些DN是空闲的呢?是Heartbeat(心跳机制),这里我的NN通过这个机制管理了整个集群的DataNode,就相当于管理了所有DN上的Block。

    Heartbeat(心跳机制):NameNode与DataNode之间存在心跳连接,DataNode每隔3s就主动向NameNode汇报健康状况,资源利用情况、任务调度等。
    balance:当一个datanode挂了,block,为了达到平衡状态,内部数据会平均分配到其他datanode上。

    下面我们来详细介绍下这几个组件:
    NameNode(NN)1)管理着文件系统命名空间:维护着文件系统树及树中的所有文件和目录,存储元信息:

    文件名目录名及它们之间的层级关系
    文件目录的所有者及其权限
    每个文件块的名及文件有哪些块组成

    存储元数据信息(描述数据的数据)的映射关系path->blockid_list 这个是什么意思呢?我们在hadoop 命令查看文件的时候,我们直接可以看到目录,文件及里面的信息,对我们来说是透明的,但这个文件其实是被切分好后散落在不同的机器上的,而具体在哪里我们是不需要关心的,Name Node帮我们记录了文件包含了那些Block,而它仅仅告诉了我们存储在哪个block上的名字(block号),但对于client来说真正有意义的则是这些block在哪些机器上。
    block->datanode(节点地址) 在这个映射中我们就知道block是在哪台dataNode上了。
    这个时候我们再看上面那个图,刚才提出的问题client如何知道去哪台机器读取自己想要的信息:(这里解决了client访问时如何知道数据在哪个DataNode机器上的问题)
    那我们已经知道了去哪台机器上读取数据了,读取什么数据呢?这时候就涉及到了datanode的映射关系:block->path 这样我们就知道读取哪个具体路径信息了。所以datanode在Name Node里面只是作为value形式存在。
    那么元数据存储在什么地方呢:

    一:内存: 响应请求的数据在内存中—-真正对外服务的数据
    二:磁盘:数据持久化—-这里就引出了SNN的作用

    元数据放在内存中,重启机器或机器宕机时,内存中的数据消失,为防止数据丢失,定期把内存中的数据dump(转储)到edit logs中(引出延迟问题),这里需要用到SNN定期把edit logs中的数据merge到fsimage(元数据镜像文件—本地磁盘)中(引出二次延迟问题),这个过程被称为SNN的持久化过程,相当于备份。当机器重启时,load fsimage文件——-这里引出了数据备份更新时的延迟问题。
    2)通常搭建机器时,定位为master机器———-主
    Job Tracker也在其上运行,(但在大公司中有需要的情况下,这两个是可以部署在两台机器上的)——-本地化原则(就近原则)减少网络IO和拷贝时间成本。
    3)Hadoop更倾向存储大文件原因:
    一般来说,一条元信息记录会占用200byte内存空间。假设块大小为64MB,备份数量是3,那么一个1GB大小的文件将占用16*3=48个文件块。如果现在有1000个1MB大小的文件,则会占用1000*3=3000个文件块(多个文件不能放到一个块中)。我们可以发现,如果文件越小,存储同等大小文件所需要的元信息就越多,所以Hadoop更喜欢大文件。
    4)元信息持久化
    在NameNode中存放元信息的文件是 fsimage。在系统运行期间所有对元信息的操作都保存在内存中并被持久化到另一个文件edits中。并且edits文件和fsimage文件会被SecondaryNameNode周期性的合并。
    内存中的元数据定期存储到fsimage中:edits只dump修改的文件,fsimage是比较旧的文件,定期把相对实时的edits合并到fsimage中,在Secondary Name Node进程中执行。

    5)1.0时,还没有zookeeper和好的集群解决方案,所以一个集群中只有一个NN,但也因为如此,引起了单点故障问题,而以当时的技术,想要解决单点故障问题,只有NFS和SNN,SNN前面说了,负责镜像文件的备份,但还是有延迟问题存在,避免不了丢失部分数据,NFS是远程网络挂载硬盘,但是增加了时间成本,所以无论那种方法都不是一种好的解决方案。
    6)运行NameNode会占用大量内存和I/O资源,一般NameNode不会存储用户数据或执行MapReduce任务。
    NameNode只有一个进程,挂掉了就没了,而且数据变大了,不能持久化,内存不够,直接制约了集群的大小,这就是1.0的局限性。
    DataNode(DN)
    由block数据块组成,存储真实数据。根据NN指示做创建、复制、删除等操作。
    心跳机制:每3秒向NN发送心跳报告健康状况以及任务进程等。
    副本机制:防止数据丢失,保证数据冗余,数据高可用,是一个以空间换取安全性的机制。(同机架两个不同位置,第三个不同机架随机位置)
    通常搭建机器环境时,定位为Slave机器——-从

    TaskTracker 和其在同一机器上运行,和前面一样,本地化原则。
    Block—-数据块

    磁盘读写的基本单位
    HDFS默认数据块大小64MB,磁盘块一般为512B
    块增大可以减少寻址时间,降低寻址时间/文件传输时间,若寻址时间为10ms,磁盘传输速率为100MB/s,那么该比例仅为1%
    数据块过大也不好,因为一个MapReduce通常以一个块作为输入,块过大会导致整体任务数量过小,降低作业处理速度

    Block副本放置策略

    一:放在客户端相同的节点(如果客户端时集群外的一台机器,就随机算节点,但是系统会避免挑选太满和太忙的节点)
    二:放在不同机架(随机选择)的节点
    三:放在与第二个副本同机架不同节点上

    distance(/D1/R1/H1,/D1/R1/H1)=0 相同的datanode
    distance(/D1/R1/H1,/D1/R1/H3)=2 同一rack下的不同datanode
    distance(/D1/R1/H1,/D1/R2/H5)=4 同一IDC下的不同datanode
    distance(/D1/R1/H1,/D2/R3/H9)=6 不同IDC下的datanode


    5)如何保证数据完整性?
    在Client读写过程中为了保证数据完整性,HDFS中有一个数据校验:CRC32(循环冗余校验码)可以类比MD5,只是不同的加密方法,数据读写过程中,一共校验两次:client写到DN之前以每隔一个单位(512字节)创建一个校验码并加上数据(相当于打了个标记)写到DN上,第二次是DN接收数据时,同样每隔512求一个校验码和发送过来的比对,以上这样即保证了数据的完整性。
    第二方式:在后台存在一扫描进程:DataBlockScanner,一旦检测出问题Block,通过心跳通知NN,NN就会让DN修复此问题Block(即从副本上拷贝数据)
    Second Name node虽然在名字上看就像是第二节点,其实不是,他并不是NN的备份,只是一个SNN进程,前面我们已经引出了很多NN的容错机制,下面我们来介绍一下它的具体功能:
    这里主要有两个文件:

    fsimage:它是在NameNode启动时对整个文件系统的快照(镜像文件)
    edit logs:它是在NameNode启动后,对文件系统的改动序列(存在磁盘中)

    两个文件状态:数据是不断改动的,而NameNode是很少重启的,所以edit log是不断增大,而fsimage是比较旧的,永远也赶不上edit log文件的;
    NameNode重启或宕机那一瞬间,内存数据瞬间消失,如何才能找回数据呢?

    NameNode重启之后会先读fsimage,之前跟eidt,两者合并才能得到完整数据,这时edit log数据会比fsimage大很多,合并需要很长时间,这时就需要一个机制,尽可能想办法怎样减小你的eidt,并且让fsimage这个文件能够尽可能的保持最新的数据状态,这样的话NameNode重启的话就不需要有合并这样的影响了,它就可以从fsimage直接读数据然后启动。所以这时就引出了SecondNameNode。

    SecondNameNode作为助手节点,检查点节点,它的作用为:

    用来保存HDFS的元数据信息,比如命名空间信息、块信息等,由于这些信息是在内存的,SecondNameNode是为了考虑持久化到磁盘
    定时到NameNode去获取edit logs,并更新到fsimage[Secondary NameNode自己的fsimage]
    一旦它有了新的fsimage文件,它将其拷贝回NameNode中。(这时有两个数据一样的fsimage)
    NameNode在下次重启时会使用这个新的fsimage文件,从而减少重启的时间。

    Secondary NameNode所做的不过是在文件系统中设置一个检查点来帮助NameNode更好的工作。它不是要取代掉NameNode也不是NameNode的备份。
    NameNode把每一次改动都会存在edit log中,但是整个事件是由谁来触发的?(DataNode)
    元数据持久化的过程(SNN来完成):内存->edit log(磁盘)-> fsimage
    fsimage:多久加载一次?(重启才会加载)
    假如我们修改了数据,内存中的数据好改,但是磁盘中的镜像文件(磁盘中的镜像文件是由hadoop namenode-format生成的叫做fsimage)不好改,那么我们怎么做呢?客户端做的操作,内存中会立刻修改,同时会在磁盘中记录日志(这个日志也有名字:edits)。这样假如namenode宕机了,我们也可以根据日志记录恢复,如果数据过多,会让启动时间变长。所以namenode就会有一个助理secondary namenode,他会定期的将日志文件下载过来,而且第一次下载的时候会把镜像文件fsimage下载过来secondary namenode会有元数据计算引擎。它会把fsimage(镜像)加载到内存中变为内存元数据对象,然后元数据计算引擎会解析edits日志文件。一边加载,一边解读,一边修改元数据。等加载完之后这批数据就是比较新的了。接着secondary namenode要把元数据重新序列化成fsimage镜像文件,将其上传给namenode.这样做的话如果namenode宕机之后,它只要下载最新编号的镜像文件即可,而secondary namenode不必每次都下载镜像文件,因为它本身就拥有最新编号的镜像文件。它只需要下载最新编号的日志文件。但是是不是每次上传的日志以及镜像都会保存?当然不会,因为这样太占空间,对于镜像文件他会保留最近的两个,对于日志会多保留几个,但是过期的日志也不会马上删掉。SNN edit目录树

    NN edit目录树


    edits_inprogress文件为正在写的edit文件seen_txid文件:
    每重启一次这个数值就会再 + 1seen_txid文件记录的是edits滚动的序号,每次重启namenode时,namenode就知道要将哪些edits进行加载到内存.
    VERSION 文件 :
    各个属性的含义:
    namespaceID 是文件系统的唯一标识,在文件系统首次格式化之后生成;
    storageType 说明这个文件存储的是什么进程的数据结构信息(如果是 DataNode, 那么 storageType=DATA_NODE)
    cTime 表示NameNode 存储时间的创建时间,由于我的NameNode没有更新过,所以这里的记录值为0,以后对NameNode升级之后,cTime将会记录更新时间戳.
    layoutVersion 表示 HDFS 永久性数据结构的版本信息,只要数据结构变更,版本号也要递减,此时的HDFS也需要升级,否则磁盘仍旧是使用旧版本的数据结构,这会导致新版本的NameNode 无法使用;
    clusterID是系统生成或手动指定的集群ID,在-clusterid选项中可以使用它; (6) blockpoolID:是针对每一个Namespace所对应的blockpool的ID,上面的这个BP- 893790215-192.168.88.101-1383809616115就是在我的ns1的namespace下的存储块池的ID,这个ID包括了 其对应的NameNode节点的ip地址。

    备份过程:

    将hdfs更新记录写入一个新的文件——edits.new。将fsimage和editlog通过http协议发送至secondary namenode。将fsimage与editlog合并,生成一个新的文件——fsimage.ckpt。这步之所以要在secondary namenode中进行,是因为比较耗时,如果在namenode中进行,或导致整个系统卡顿。将生成的fsimage.ckpt通过http协议发送至namenode。重命名fsimage.ckpt为fsimage,edits.new为edits。等待下一次checkpoint触发SecondaryNameNode进行工作,一直这样循环操作。注:checkpoint触发的条件可以在core-site.xml文件中进行配置。fs.checkpoint.period表示多长时间记录一次hdfs的镜像。默认是1小时。fs.checkpoint.size表示一次记录多大的size,默认64M。模拟数据恢复hdfs文件系统删除之后的备份:删除之前hdfs文件系统上的所有的文件:

    接下来删除 dfs下的name文件:

    使用jps查看进程,发现hadoop进程还是启动着,原因是在内存中运行,可以杀死NameNode进程:

    然后手动启动namenode:hadoop-daemon.sh start namenode

    但是此时hadoop集群已经拒绝连接了,其他的机器也都是这样:

    并且在master上的namenode并没有启动成功:

    将namesecondary这个文件夹的内容全部复制到当前目录,取名叫作name

    复制好了这个namesecondary之后,再次启动namenode:

    这次能够正常启动,并且能够正常查询到hadoop集群上的hdfs文件系统上的文件,恢复了之前的样子.这两个工作目录 name 和 namesecondary 的结构是一样的.所以就可以用namesecondary来恢复.日过此时在hdfs上新建一个文件,然后把name目录删除,重复上面的恢复的步骤,那么这个新建的文件是不会恢复的,这个刚刚新建的文件还只是在日志文件里面,还没来得及做合并.namesecondary里面还没有这个元数据.所以恢复出来之后就会少一个文件.综上,namesecondary就可以做一个数据源的备份.checkpoint 的附滞作用:namenode 和 secondary namenode 的工作目录存储结构完全相同,所以,当namenode故障退出需要重新恢复时,可以从secondary namenode 的工作目录中将fsimage拷贝到namenode的工作目录,以恢复namenode的元数据.
    总结前面已经引出了部分1.0中存在的问题,总结下:

    单点故障NameSpace(命名空间的限制):由于Namenode再内存中存储所有的元数据(metadata),因此单个Namenode所能存储的对象(文件+块)数目收到Namenode所在JVM的heap(堆) size的限制。50G的heap能够存储20亿个对象,这20亿个对象支持4000个datanode,12PB的存储(假设文件爱呢平均大小为40MB)。随着数据的飞速增长,存储的需求也随之增长。单个datanode从4T增长到36T,集群的尺寸增长到8000个datanode。存储的需求从12PB增长到大于100PB。(内存的限制)
    性能的瓶颈:由于是单个Namenode的HDFS架构,因此整个HDFS文件系统的吞吐量受限于单个NameNode的吞吐量。
    隔离问题:由于HDFS仅有一个Namenode,无法隔离各个程序,因此HDFS上的一个实验程序很可能影响整个HDFS上运行的程序。
    集群的可用性:在只有一个Namenode的HDFS中,此Namenode的宕机无疑会导致整个集群的不可用。(低可用性)
    Namespace和Block Management的紧密耦合:Hadoop 1.x在Namenode中的Namespace和Block Management组合的紧密耦合关系会导致如果想要实现另外一套Namenode方案比较困难,而且也限制了其他想要直接使用块存储的应用。
    不适合小文件存储———数据块富余过多,浪费空间
    压缩文件——单独占一个split——-过大时存在问题
    内存问题—-元数据多时,内存不够存储
    不建议大量随机读(无法优化寻址时间成本),可以预读
    异步操作:速度快,不能保证数据完整性

    下面总结一下这里的可靠性保证:

    心跳机制
    副本机制
    CRC32数据校验
    DataBlockScanner
    SNN:保证元数据
    回收站:trash目录(core-site.xml设置)
    报告 命令 例:hdfs fsck /path -files - blocks -locations
    快照 命令

    HDFS 特性:

    能做什么

    存储并管理PB级数据处理非结构化数据 hive,hbase注重数据处理的吞吐量(延迟不敏感) 内存预写应用模式:write-once-read-many存取模式(无数据一致性问题)一次/user写入多次/user读取写错:通常是删除和追加不影响吞吐量
    不适合做

    存储小文件(不建议)大量随机读(不建议) 文件指针需要对文件修改(不支持)多用户写入(不支持)




    客户端调用 create()来创建文件,Distributed File System 用 RPC 调用 NameNode节点,在文件系统的命名空间中创建一个新的文件。NameNode 节点首先确定文件原来不存在,并且客户端有创建文件的权限,然后创建新文件。Distributed File System 返回 FSDOutputStream,客户端用于写数据。客户端开始写入数据,FSDOutputStream 将数据分成块,写入 Data Queue。Data Queue 由 DataStreamer 读取,并通知 NameNode 节点分配数据节点,用来存储数据块(每块默认复制 3块)。分配的数据节点放在一个 Pipeline 里。Data Streamer 将数据块写入 Pipeline 中的第一个数据节点。第一个数据节点将数据块发送给第二个数据节点。第二个数据节点将数据发送给第三个数据节点。DFSOutputStream 为发出去的数据块保存了 Ack Queue,等待 Pipeline 中的数据节点告知数据已经写入成功。写入一个datanode都说明写入数据成功,内部datanode会数据冗余。




    首先 Client 通过 File System 的 Open 函数打开文件,Distributed File System 用 RPC调用 NameNode 节点,得到文件的数据块信息。对于每一个数据块,NameNode 节点返回保存数据块的数据节点的地址。Distributed File System 返回 FSDataInputStream 给客户端,用来读取数据。客户端调用 stream的 read()函数开始读取数据。FSDInputStream连接保存此文件第一个数据块的最近的数据节点。DataNode 从数据节点读到客户端(client),当此数据块读取完毕时,FSDInputStream 关闭和此数据节点的连接,然后连接此文件下一个数据块的最近的数据节点。当客户端读取完毕数据的时候,调用FSDataInputStream 的 close 函数。在读取数据的过程中,如果客户端在与数据节点通信出现错误,则尝试连接包含此数据块的下一个数据节点。失败的数据节点将被记录,以后不再连接。
    同步与异步问题:

    同步:速度慢,但是可以保证全局数据一致性异步:速度快,但无法保证数据一致性—-HDFS用的就是异步操作
    本地模式

    本地模式:保证计算框架和任务调度管理部署在同一台机器上,体现本地化原则,尽量减少数据移动开销。类比后面的yarn功能把JobTracker的功能都分权了分为RM、AM。
    本地化原则针对map阶段,因为reduce阶段有个远程partition,不能保证是同一机器。

    下节我们介绍HDFS2.0原理,看看它是如何解决这些1.0中的故障问题的。这节还会持续修改
    4 留言 2019-01-23 20:15:29 奖励18点积分
  • 大数据2-3MR进阶

    前文链接:https://write-bug.com/article/2049.html
    由于前面的文章再进行深入文字过多会影响阅读体验,所以决定再抽出一篇文章:
    这节主要有这么几个重点:

    MR基础深入
    Hadoop streaming开发,支持大多数语言开发MR
    Hadoop2.6.5搭建
    实践

    MR基础深入
    还是用这个图来讲解:

    map和reduce算子都是采用多进程的并发方式,便于每个任务占用独享空间,方便对每一个任务进行资源调度与调配,但是进程比线程会消耗更多的启动时间,直接制约着任务延迟,失效性不高,只能做离线高吞吐任务,而后面的spark-core反而是采用了多线程模型,所以速度更快,但是吞吐量大时稳定性不是太高
    HDFS:文件要存储在HDFS中,每个文件切分 成多个一定大小(默认64M)的Block (默认3个备份)存储在多个节点 (DataNode)上,文本格式:text file明文格式,但是浪费空间,SquenceFile格式默认<k,v>对存储,支持压缩等等格式
    InputFormat接口: MR框架基础类之一

    数据分割(Data Splits)防止句子被切散,当有一条记录横跨两个block时,在分割过程中会被分割到前面的block中去,所以就会出现split和block不完全相等大小的情况
    记录读取器(Record Reader)以标准输入每读取一条记录,就调用一次map函数,这条记录作为map的参数,读取下一条记录直到split尾部

    map:逻辑处理,标准输入,标准输出
    shuffle:Partion(对keyhash), Sort(对key sort), Spill(溢写), Meger, Combiner(map端的局部reduce), Copy, Memery, Disk……
    partitioner:由{key,vlaue}—>{partition号,key,value}partition号:key ->hash 取模 ->返回m
    Mermory Buffer: shuffle过程都是在这里完成的(比如partition号已经确定,对key排序,合并combiner等等),上节已经说过默认100M,溢写阈值0.8:80M,清空内存,dump到本地

    有时逻辑处理combiner不适用:比如求中值
    reduce : 逻辑处理,标准输入,标准输出,同样也有着内存缓冲区:spil,sort等功能,前面map端的partition和combiner直接影响到这里获取的输入源不会出现key重复的情况,所以合并reduce输出文件就是最终结果了。
    slot概念:map和reduce有着不同的slot槽位—-cpu核数-1—-一种容器资源

    mapred.tasktracker.map.tasks.maximum(默认2) mapreduce.tasktracker.tasks.reduce.maximum(默认2)mapred.local.dir(map中间结果存放路径)dfs.data.dir(hdfs目录存放路径)

    在这里我们列举一下其中的细节知识点:

    JobTracker只做管理和通知,数据只在map和reduce之间流动,准确的说,只会在TaskTracker之间流动。
    排序是框架内置的,默认就有。分组不是减少网络开销,分组不是合并,只是把相同的key的value放到一起,并不会减少数据.分组是给了同一个map中相同key的value见面的机会.作用是为了在reduce中进行处理.map函数仅能处理一行,两行中出现的这个单词是无法在一个map中处理的.map不能处理位于多行中的相同的单词.分组是为了两行中的相同的key的value合并到一起。
    在自定义MyMapper类内部定义HashMap处理的是一个block,在map方法内部定义处理的是一行。
    在hadoop全局中不会有线程问题,因为hadoop起的是进程,不会有并发问题存在.这也就造成了MR的速度问题。
    map和reduce不是1对1的,通常map数量远远超过reduce,reduce常常是每个节点上一个。
    map个数为split的份数。
    Reduce个数等于输出个数
    dfs.block.size决定block大小,默认64M—-hdfs-site.xml
    在memory排序:速排—-80%水位线:先排序再溢写
    把每一个block变成一个RecordReader
    分而治之思想:分:map(split)—-一对一的关系 分发:patition—-相同的key统一到一个reduce上 合:reduce
    压缩文件不可切分,非压缩文件和sequence文件可以切分
    增加task的数量,一方面增加了系统的开销,另一方面增加了负载平衡和减小了任务失败的代价;
    map task的数量即mapred.map.tasks的参数值,用户不能直接设置这个参数。Input Split的大小,决定了一个Job拥有多少个map。默认input split的大小是64M(与dfs.block.size的默认值相同)。然而,如果输入的数据量巨大,那么默认的64M的block会有几万甚至几十万的Map Task,集群的网络传输会很大,最严重的是给Job Tracker的调度、队列、内存都会带来很大压力。
    mapred.min.split.size这个配置项决定了每个 Input Split的最小值,用户可以修改这个参数,从而改变map task的数量。
    一个恰当的map并行度是大约每个节点10-100个map,且最好每个map的执行时间至少一分钟。
    reduce task的数量由mapred.reduce.tasks这个参数设定,默认值是1。
    合适的reduce task数量是0.95或者0.75*( nodes mapred.tasktracker.reduce.tasks.maximum), 其中,mapred.tasktracker.tasks.reduce.maximum的数量一般设置为各节点cpu core数量,即能同时计算的slot数量。对于0.95,当map结束时,所有的reduce能够立即启动;对于1.75,较快的节点结束第一轮reduce后,可以开始第二轮的reduce任务,从而提高负载均衡。

    Hadoop streaming
    MapReduce和HDFS采用Java实现,默认提供Java编程接口

    Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中 使用 Streaming方便已有程序向Hadoop平台移植,可移植性好
    原理:在map和reduce外层套了一层标准输入标准输出,内层语言就没有了强依赖,所以在开发过程中,只需要按照一定格式标准输入读取数据,标准输出写数据即可
    容易单机调试:cat input |mapper|sort|python reducer > output
    局限:默认只能处理文本数据,如果处理二进制文件,需要先进行编码转化文本。效率没有Java开发效率高(两次拷贝和解析分割)
    脚本参数:

    input: 指定作业的输入文件的HDFS路径,支持使用*通配 符,支持指定多个文件或目录,可多次使用output: 指定作业的输出文件的HDFS路径,路径必须不存在, 并且具备执行作业用户有创建该目录的权限,只能 使用一次mapper: 用户自己写的mapper程序 “python 、bash等”reduer: 用户自己写的reduce程序 —不是必须的(比如简单过滤)file:本地文件(提交命令的机器)分发到各个节点,适合小文件cachefile:hdfs压缩文件分发到各个节点archivefile:hdfs压缩目录分发到各个节点(自动解压)

    这里有一些jobconf的基本配置:(高版本:-D)

    mapred.map.tasks:map task数目:split、压缩文件、block个数
    mapred.reduce.tasks:控制reduce task数目:个数适当设置,过多输出大量小文件,过少出错还要从新map成本过高
    stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
    num.key.fields.for.partition指定对key分出来的前几部分做partition而不是整个 key
    mapred.job.name:作业名
    mapred.job.priority:作业优先级
    mapred.job.map.capacity:最多同时运行map任务数
    mapred.job.reduce.capacity:最多同时运行reduce任务数
    mapred.task.timeout:任务没有响应(输入输出)的最大时间
    mapred.compress.map.output:map的输出是否压缩
    mapred.map.output.compression.codec:map的输出压缩方式
    mapred.output.compress:reduce的输出是否压缩
    mapred.output.compression.codec:reduce的输出压缩方式
    stream.map.output.field.separator:map输出分隔符
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \

    利用该配置可以完成二次排序
    -jobconf org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \利用该配置可以完成key排序
    -jobconf stream.num.map.output.key.fields=1 \设置map分隔符的位置,该位置前的为key,之后的为value
    -jobconf mapred.text.key.partitioner.options="-k1,1" \选择哪一部分做partition,根据几到几分区
    -jobconf mapred.text.key.comparator.options="-k1,1n" \设置key中需要比较的字段或字节范围
    Hadoop2.6.5搭建环境:centos7
    http://note.youdao.com/noteshare?id=830761ebbc56f38c926030ca882b4313&sub=482AF51AF94B4E60BA1DE02B7C850B0F这个搭建还是用的HDFS1.0的容灾机制,2.0还可以使用,但在工作中是完全使用2.0机制的,搭建流程请参考(体验):参考文章:https://www.cnblogs.com/selinux/p/4155814.html【192.168.87.150】master1:NN ZKFC RM【192.168.87.151】master2:NN ZKFC RM【192.168.87.155】slave1:DN NM ZK JN【192.168.87.156】slave2:DN NM ZK JN【192.168.87.157】slave3:DN NM【192.168.87.158】slave4:DN NM ZK JN
    实践代码流程
    wordcount
    分发,分发方式:

    -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \-cacheFile "hdfs://master:9000/white_list#ABC" \-file ./white_listcacheArchiveFile ——map.py

    os.path.isdir()用于判断对象是否为一个目录
    os.listdir()用于返回一个由文件名和目录名组成的列表,需要注意的是它接收的参数需要是一个绝对的路径
    os.path.isfile()用于判断对象是否为一个文件
    压缩文件传过来,传在read_local_file_func函数中,再通过get_cacheFile_handlers函数中的isdir判断是否目录,再通过listdir遍历目录返回文件名和目录名,再通过get_file_handler函数把文件以只读进来返回文件(文件的指针将会放在文件的开头)。追加f_handlers_list数组中,返回数组给read_local_file_func函数作strip,把白名单中的单词分到set中返回set,之后再判断word是否在set中然后输出。

    red.py

    map输出文件读进来分别付给key和value值,利用一个当前单词和计数池数组mege数组中的单词和数量,之后清空,循环,输出。
    全排序
    第一个:排序

    a.txt b.txt读进来,做split后赋给key,val由于mr是按字典排序,所以需要加成相同字段数red中还原字段输出配置中:强制用一个reduce来处理
    第二个:排序之后再按前50 后50 分成两个桶

    map中模拟一个partition号idx加在前面标识,red中还原数据输出配置中:两个reduce处理,一个partition,从第二项后面分割数据,第三项为val
    第三个:全部利用配置来排序

    map读进来分割后red输出配置中:一共四个字段,前三个为key,后一个为value,根据二三做partition,三个reduce处理

    表单joinAB 表
    A.left join(B)
    A.key <— B(拼)

    Step1: map_a给a文件强制中间加flag 1
    Step2: map_b给b文件强制中间加flag 2
    Step3

    map_join把两个文件以w(打开一个文件只用于写入。如果该文件已存在则将其覆盖。如果该文件不存在,创建新文件。)方式打开strip分别赋值给key1,key2输出red_join标准输入流进来split中 赋值给key,flag,val.判断flag 设立中间val值,输出key,v1,v2,清空中间值

    鲁棒:虽然可以设置一个flag1,隔一个flag2,但是根据业务不同:

    key出现多次flag全是1—-没有join上,但是把key保留住,A中key字段在B中没有,所以需要表示null等flag全是2
    按字典排序的
    Linux:head -20 ip.lib.txt |column -t 格式化挖掘数据,与算法结合折半查找:二分法——排好序的列表
    mid=low+high /2定位
    截断一半high=mid-1
    函数load_ip_lib——func只读加载文件,split,判断是否5个字段,分别赋值,追加给list数组返回,mapper把返回数组传进来,同样对cookie文件split,判断是否2个字段,分别赋值,再把IP字段和list数组传入get_addr函数二分法判断IP字段,并且根据ip段判断province,返回province,mapper输出cookie,IP,address
    -reducer "cat" \-jobconf "mapred.reduce.tasks=2" \-jobconf "mapreduce.reduce.memory.mb=5000" \输出数据压缩怎么控制让mr任务支持压缩功能,通过压缩形式控制后续mr的并发个数
    -jobconf "mapred.reduce.tasks=10"-jobconf "mapred.compress.map.output=true" \#中间结果map的输出开关-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \#Gzip格式-jobconf "mapred.output.compress=true" \#最后压缩-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \再把输出数据当作输入看mapcat(标准输入输出)是不是10个map个数
    总结分类

    数据统计 hive(mr)—-word count
    数据过滤—-白名单
    同类汇聚—-join
    全局排序—-单/多reduce
    容错框架(报警邮件)

    Linux:cat ip.lib.txt |awk '{print $1,$NF}'| head 只保留第一个字段和最后一个字段思考:我们如何把这些散装的实践和思想进行拼装应用在业务上呢?
    2 留言 2019-01-23 13:49:16 奖励23点积分
显示 0 到 25 ,共 25 条
eject