分类

类型:
不限 游戏开发 计算机程序开发 Android开发 网站开发 笔记总结 其他
评分:
不限 10 9 8 7 6 5 4 3 2 1
原创:
不限
年份:
不限 2018 2019

技术文章列表

  • 在DLL中创建共享内存

    背景我们都知道在 32 位系统上,每个进程都有自己 4GB 大小的独立空间,互不影响。当然,对内核了解的同学则会质疑说,4GB 大小的内容可以大致分为两部分,低 2 GB内存空间是用户地址空间,高 2 GB是内核地址空间,而内核地址空间是共享的,并非独立。是的,这没错。所以,本文中所指的独立空间是用户地址空间。
    举个例子来说,进程1 的 0x400000 内存地址和 进程2 中 0x400000 内存地址是没有任何关联。任意修改其中一个内存里的数据,是不影响另一个的。这边是进程独立性。
    但是,本文要介绍的这个知识点,就是要突破这个独立性的显示,创建进程共享内存。确切的说是在DLL中创建共享内存,就是在DLL中创建一个变量,然后DLL被加载到多个进程空间,只要其中一个进程修改了该变量的值,其他进程DLL中的这个值也会改变,就相当于多个进程共享一块内存。
    实现原理实现原理比较简单,就是先为DLL创建一个数据段,然后再对程序的链接器进行设置,使其在程序编译完毕开始链接的时候,根据设置的链接选项,把指定的数据段链接为共享数据段。这样,就可以创建共享内存了。
    #pragma data_seg("MySeg") char g_szText[256] = {0};#pragma data_seg()#pragma comment(linker, "/section:MySeg,RWS")
    例如,在上面的代码中,我们使用 #pragma data_seg 创建了一个名为 MySeg 的数据段,接着使用 /section:MySeg,RWS 把 MySeg 数据段设置为可读、可写、可共享的共享数据段。
    程序测试我们开发一个程序加载这个创建了共享内存的DLL,然后调用它的导出函数 Change 来修改共享内存变量的值,调用 Show 来显示共享内存变量的值。发现,只要有一个进程修改了这个变量,其他所有进程里的变量也跟着改变。

    总结这个实现上比较简单,难点就是在思想上的转换,要理解什么是共享内存的思想。
    参考参考自《Windows黑客编程技术详解》一书
    3 留言 2018-11-07 11:28:40 奖励5点积分
  • 5G将渗透哪些领域

    5G爆发前夕将渗透哪些领域?5G 已经被吹捧为许多细分市场的新的赋能者,它将给移动电话、自动驾驶、虚拟现实(VR)和物联网等行业带来机会。但是,这个新的无线通信标准将在何时和将以何种方式影响这些细分市场,以及它将对半导体设计产生什么样的影响,目前尚有很多问题和不确定性。
    基于 5G 承诺的通信速度的大幅度提高和延迟的大幅度降低,系统供应商必须在将数据处理放在本地或在云端之间作出决定。这将对半导体体系结构产生重大影响,包括处理器和内存的片上吞吐量、I/O 速度、功率预算,甚至电池大小等等。此外,这些决定还将受到 5G 基础设施接入和通信频率的影响。
    不过,这些事情可能要等到数年后才会发生。一开始,大部分的 5G 应用将在低于 6GHz 的范围内,也就是和 4.5G 相当。最大的受益者将是移动电话行业,它将在一段时间内仍然是 5G 技术的最大消费者。移动电话的标准和存储容量将会因为 5G 技术的不断引入而持续进化,它们将依然是 5G 技术的发展的重要资金来源。
    下一个阶段的发展是在毫米波技术引进之时。这时候最重要的变化将开始发生。一般的经验法则是,任何一项新技术想要成功,必须提供10倍的增益,要么是性能提高、或功率降低、成本降低、面积更小或者多种增益的组合。只有在那个时候,5G 技术才会真正地大放光彩。
    “5G 将在连接性能方面提供显著的改进,其目标是比目前 4G 的连接性能提高1000倍”。Steven Woo 说,他是 Rambus 实验室负责系统和解决方案的副总裁、杰出的发明家。 “除了带宽方面的改进,5G 还承诺了降低延迟和更好的覆盖范围。”
    在毫米波技术的应用初期,设计成本和硅晶元出货面积将会显著上升。电力消耗将成为主要的问题,这取决于基础设施和它能承受的负荷压力,因为它们需要承担信号发送和信号到达时的计算工作。
    物联网一个真正能从 5G 中受益的领域是边缘计算,其中功耗是一个限制因素。“我们希望 5G 的引入能使物联网边缘设备的功耗更低,因为平均来说 5G 拉近了它们与接入点的距离。” Cadence 的产品营销总监 Neil Robinson 说:“这意味着与 4G 通信所需要的更长距离相比,5G 通信所需要的功耗更低”。
    这样就打开了一扇大门,让我们有能力处理比目前更加复杂的处理流程和通信方案。Woo 说:“5G 将提供更高的带宽,这意味着越来越多的终端(endpoint)将会更容把它们的数据传输到相邻位置,在那里这些数据得以在本地处理。”这意味着只有少量的,级别更高的数据/信息,才需要传输到云端处理。
    但这种方案也会很快变得复杂。“5G 所需的高带宽和多天线策略意味着,从本质上讲,它们需要更高的能耗。”西门子仿真部门市场营销高级总监 Jean-Marie Brunet 指出。“然而,人们普遍认为物联网的大部分将是机对机(M2M)通信。而机对机通信模式比人工启动的物联网更容易预测,因此机对机实例的低功耗算法应该更高效,这种说法是有争议的。”
    实际上的好处将随着不同的应用和区域有所不同。“归根结底,5G 将使每比特功耗降低10倍到100倍,同时带来相当于10倍的带宽增幅,这些变化将在元器件的寿命方面产生净增益,并同时提高10倍的功效。因使用模式的不同这些好处会有所不同。”
    这是否足以影响架构?物联网边缘设备已开始将推理运算本地化,以避免与向云端传输大量原始数据而带来的带宽影响。
    “更高的带宽和更低的延迟将使在云端进行推理运算变得更容易,如果我们需要这样做的话。” Cadence 的 Robinson 说。“但是,隐私、安全、延迟和功耗问题可能会让这种方式变得不合适。”
    Cadence 的 Tensilica IP 产品管理和营销高级总监 Lazaar Louis 也回应了这些担忧。“对隐私和安全问题的担忧将会使推理运算继续留在边缘设备进行,”他说。“将传感器收集到的信息传送到云端会消耗能量,因此在边缘设备进行推理运算,可以节省在边缘设备上消耗的能量。”
    Rambus 实验室的 Woo 同意这一说法。“更高的带宽可能不能排除在边缘设备上进行推理运算的必要性,但它们将允许更大数量的物联网设备相互连接,使基础设施能够跟上不断增长的物联网设备的需求,并使其捕获和传输的数据量不断增长。”
    VR(虚拟现实)和AR(增强现实)虚拟现实(VR)碰到了难题。如果没有更高的数据速率,设备供应商将很难消除晕眩不适感,这大大地限制了 VR 的使用。毫米波技术可以帮助解决这个问题。虽然毫米波信号不能通过墙壁,只能在相对较短的距离内工作,但 VR 耳机和控制器的距离通常只有几英尺远。
    “与以前运行同一款游戏的 4G 产品相比,高分辨率的 8K 游戏流媒体肯定会更快地将电池的电量耗光。” Brunet 承认 “但是这并不是 5G 造成的,而是因为更高级的 CPU 和显示设备。在毫米波频率下,对于空中下载(OTA)的功耗需求,收发器的功耗将会以加速度级地增加。”
    自动驾驶自动驾驶需要将许多技术结合在一起,5G 就是其中之一。“更低的延迟为具有自主驾驶能力的联网车辆带来了好处,在这种情况下,响应时间至关重要,尤其是在高速公路的行驶速度下。”Woo说道。“5G 覆盖范围的改进,加上增强的本地处理能力,将允许在数据产生的终端设备附近对数据进行聚合和处理,从而减少数据传输。减少长距离的数据传输对于 5G 来说是一个重要的好处,因为它可以同时改进了延迟和功耗。”
    汽车很可能成为通讯枢纽。“汽车将是微型发射器,” Robinson 说。“和现在的 802.11p(V2V)和4G/OnStar(V2X)面临的情况类似,5G 也面临着来自电视网络的阻挠,他们希望使用已经分配给 V2V(车辆对车辆)的相同频段。他们声称,自动驾驶不需要 V2V,在路上使用 V2X(车辆对所有)就可以来了解其他车辆的存在/意图。”
    这个观点得到其他人的同意。“车辆到所有(V2X)将主导通信,车辆将成为拥有很多发射器的移动网络,在许多情况下,每个功能域都有多个发射器。” Brunet 说:“V2X 将是实现安全的关键因素,因为激光雷达或雷达根本看不到拐角处,而 5G 可以依据拐角处的反射将这种实现变得可能。”
    到底是从其他车辆还是从路边的信息中得到这些信息,现在尚没有明确的答案。“汔车能获得的信息越多,它们就越能在自动驾驶体验方面做出更好的决定。” Cadence 公司的 Louis 指出。 “更先进的自动驾驶将得益于和其他车辆/基础设施的 V2X 通信,例如路线规划和车道变更辅助。”
    新的通信能力也可能带给我们今天还没有想到的好处。“联网车辆只是众多可能受益的设备之一。” Woo 说。“更高的带宽和更高的覆盖率,再加上更强的本地处理能力,将帮助车辆与周围环境以及本地地图数据进行通信,从而在将来实现道路导航。今天的联网汽车已经捕获了大量的数据,其中一些信息被传送到云端。预计汽车将继续发展而成为信息枢纽,因为它们可以作为乘客设备的连接点(就像手机可以作为智能手表和健身设备等外围设备的连接点一样),并与其他车辆进行点对点通信,以交流像车道变化之类的预期行动,以及交通状况和危险信息。”
    实施注意事项今天大多数 5G 的实现都还只是原型,而且并不是所有的问题都已经得到解决。例如,5G 可以提供每秒10到20千兆位的传输速度,但数字系统必须具备相应的工作速度才能受益于 5G 的高速度。如果为了降低成本而仍然使用旧的工艺节点生成数字信息,这可能使得能使用的工艺节点变得有限,或者我们需要更先过的封装解决方案。
    “这是摩尔定律的一个结合,它减慢了速度,同时增加了芯片的复杂性和所需要的工艺的复杂性。。” Rambus 的产品管理高级主管 Frank Ferro 表示:“你不需要做一个大型的 ASIC,你必须问一下分解它是否更划算。做两个更小的 ASIC 或重新使用你已经进行了大量投资的混合信号器件是否更便宜?如果你已经在高速工艺技术上进行了投资,你想继续扩大规模吗?或者,您可以使用现有的技术,加上接口技术,而不必每次更改工艺节点时都开发 SerDes 呢?“
    同样,不同的产品领域可能会得出不同的结论。“我不知道 5G 节点的期望值。” Robinson 说道。“这将归结为每个产品或公司的成本
    0 留言 2019-04-22 10:31:20 奖励15点积分
  • 跨域

    一、什么是跨域?1.1 什么是同源策略及其限制内容?同源策略是一种约定,它是浏览器最核心也最基本的安全功能,如果缺少了同源策略,浏览器很容易受到XSS、CSRF等攻击。所谓同源是指”协议+域名+端口”三者相同,即便两个不同的域名指向同一个IP地址,也非同源。
    同源策略限制内容有:

    Cookie、LocalStorage、IndexedDB 等存储性内容
    DOM 节点
    AJAX 请求发送后,结果被浏览器拦截了

    但是有三个标签是允许跨域加载资源:

    <imgsrc=XXX>

    <linkhref=XXX>

    <scriptsrc=XXX>



    1.2 常见跨域场景当协议、子域名、主域名、端口号中任意一个不相同时,都算作不同域。不同域之间相互请求资源,就算作“跨域”。
    特别说明两点:

    第一:如果是协议和端口造成的跨域问题“前台”是无能为力的
    第二:在跨域问题上,仅仅是通过“URL的首部”来识别而不会根据域名对应的IP地址是否相同来判断。“URL的首部”可以理解为“协议, 域名和端口必须匹配”

    这里你或许有个疑问:请求跨域了,那么请求到底发出去没有?
    跨域并不是请求发不出去,请求能发出去,服务端能收到请求并正常返回结果,只是结果被浏览器拦截了。
    你可能会疑问明明通过表单的方式可以发起跨域请求,为什么 Ajax 就不会?
    因为归根结底,跨域是为了阻止用户读取到另一个域名下的内容,Ajax 可以获取响应,浏览器认为这不安全,所以拦截了响应。但是表单并不会获取新的内容,所以可以发起跨域请求。
    同时也说明了跨域并不能完全阻止 CSRF,因为请求毕竟是发出去了。
    1.3 跨域解决方案1.3.1 jsonpJSONP原理
    利用 <script> 标签没有跨域限制的漏洞,网页可以得到从其他来源动态产生的 JSON 数据。JSONP请求一定需要对方的服务器做支持才可以。
    JSONP和AJAX对比
    JSONP和AJAX相同,都是客户端向服务器端发送请求,从服务器端获取数据的方式。但AJAX属于同源策略,JSONP属于非同源策略(跨域请求)
    JSONP优缺点
    JSONP优点是简单兼容性好,可用于解决主流浏览器的跨域数据访问的问题。缺点是仅支持get方法具有局限性,不安全可能会遭受XSS攻击。
    JSONP的实现流程

    声明一个回调函数,其函数名(如show)当做参数值,要传递给跨域请求数据的服务器,函数形参为要获取目标数据(服务器返回的data)
    创建一个 <script>标签,把那个跨域的API数据接口地址,赋值给script的src,还要在这个地址中向服务器传递该函数名(可以通过问号传参:?callback=show)
    服务器接收到请求后,需要进行特殊的处理:把传递进来的函数名和它需要给你的数据拼接成一个字符串,例如:传递进去的函数名是show,它准备好的数据是 show(‘我不爱你’)
    最后服务器把准备的数据通过HTTP协议返回给客户端,客户端再调用执行之前声明的回调函数(show),对返回的数据进行操作

    JSONP都是GET和异步请求的,不存在其他的请求方式和同步请求,且jQuery默认就会给JSONP的请求清除缓存。
    1.3.2 corsCORS 需要浏览器和后端同时支持。IE 8 和 9 需要通过 XDomainRequest 来实现。
    浏览器会自动进行 CORS 通信,实现 CORS 通信的关键是后端。只要后端实现了 CORS,就实现了跨域。
    服务端设置 Access-Control-Allow-Origin 就可以开启 CORS。 该属性表示哪些域名可以访问资源,如果设置通配符则表示所有网站都可以访问资源。
    虽然设置 CORS 和前端没什么关系,但是通过这种方式解决跨域问题的话,会在发送请求时出现两种情况,分别为简单请求和复杂请求。
    简单请求
    只要同时满足以下两大条件,就属于简单请求

    条件1:使用下列方法之一:

    GETHEADPOST
    条件2:Content-Type 的值仅限于下列三者之一:

    text/plainmultipart/form-dataapplication/x-www-form-urlencoded

    请求中的任意 XMLHttpRequestUpload 对象均没有注册任何事件监听器; XMLHttpRequestUpload 对象可以使用 XMLHttpRequest.upload 属性访问。
    复杂请求
    不符合以上条件的请求就肯定是复杂请求了。 复杂请求的CORS请求,会在正式通信之前,增加一次HTTP查询请求,称为”预检”请求,该请求是 option 方法的,通过该请求来知道服务端是否允许跨域请求。
    1.3.3 postMessagepostMessage是HTML5 XMLHttpRequest Level 2中的API,且是为数不多可以跨域操作的window属性之一,它可用于解决以下方面的问题:

    页面和其打开的新窗口的数据传递
    多窗口之间消息传递
    页面与嵌套的iframe消息传递
    上面三个场景的跨域数据传递

    postMessage()方法允许来自不同源的脚本采用异步方式进行有限的通信,可以实现跨文本档、多窗口、跨域消息传递。
    otherWindow.postMessage(message, targetOrigin, [transfer]);
    message: 将要发送到其他 window的数据。
    targetOrigin:通过窗口的origin属性来指定哪些窗口能接收到消息事件,其值可以是字符串”*”(表示无限制)或者一个URI。在发送消息的时候,如果目标窗口的协议、主机地址或端口这三者的任意一项不匹配targetOrigin提供的值,那么消息就不会被发送;只有三者完全匹配,消息才会被发送
    transfer(可选):是一串和message 同时传递的 Transferable 对象. 这些对象的所有权将被转移给消息的接收方,而发送一方将不再保有所有权。

    1.3.4 websocketWebsocket是HTML5的一个持久化的协议,它实现了浏览器与服务器的全双工通信,同时也是跨域的一种解决方案。WebSocket和HTTP都是应用层协议,都基于 TCP 协议。
    但是WebSocket是一种双向通信协议,在建立连接之后,WebSocket的server与 client都能主动向对方发送或接收数据。
    同时,WebSocket 在建立连接时需要借助 HTTP 协议,连接建立好了之后 client 与 server 之间的双向通信就与 HTTP 无关了。
    原生WebSocket API使用起来不太方便,我们使用 Socket.io,它很好地封装了webSocket接口,提供了更简单、灵活的接口,也对不支持webSocket的浏览器提供了向下兼容。
    1.3.5 Node中间件代理(两次跨域)实现原理:同源策略是浏览器需要遵循的标准,而如果是服务器向服务器请求就无需遵循同源策略。代理服务器,需要做以下几个步骤:

    接受客户端请求
    将 请求 转发给服务器
    拿到服务器 响应 数据
    将 响应 转发给客户端

    1.3.6 nginx反向代理实现原理类似于Node中间件代理,需要你搭建一个中转nginx服务器,用于转发请求。
    使用nginx反向代理实现跨域,是最简单的跨域方式。只需要修改nginx的配置即可解决跨域问题,支持所有浏览器,支持session,不需要修改任何代码,并且不会影响服务器性能。
    实现思路:通过nginx配置一个代理服务器(域名与domain1相同,端口不同)做跳板机,反向代理访问domain2接口,并且可以顺便修改cookie中domain信息,方便当前域cookie写入,实现跨域登录。
    1.4 总结
    CORS支持所有类型的HTTP请求,是跨域HTTP请求的根本解决方案
    JSONP只支持GET请求,JSONP的优势在于支持老式浏览器,以及可以向不支持CORS的网站请求数据。
    不管是Node中间件代理还是nginx反向代理,主要是通过同源策略对服务器不加限制。
    日常工作中,用得比较多的跨域方案是cors和nginx反向代理

    参考文章
    跨域资源共享 CORS 详解
    前端面试之道
    window.postMessage
    前端常见跨域解决方案(全)
    深入跨域问题(4) - 利用代理解决跨域
    3 留言 2019-04-08 11:06:29 奖励15点积分
  • 基于Python使用词云图

    1 引言词语图,也叫文字云,是对文本出现频率较高的“关键词”予以视觉化的展现,词云图过滤掉大量的低频低质的文本信息,使得浏览者一下就可以知道文章的主旨。
    2 模块准备import jieba # 分词模块import matplotlib.pyplot as plt # 画图模块from wordcloud import WordCloud # 文字云模块from scipy.misc import imread # 处理图像的函数,用于读取并处理背景图片
    3 实现的思路准备一份需要分析的文本材料,这里选用的是 19 年两会政府工作报告,首先用 jieba 模块对文本材料进行分词处理(即识别出一个个有意义的词语),然后对处理后的材料使用WordCloud 文字云模块生成相应的词云图片即可of course,你也可以选择一张背景图片,以此为背景生成特定的云图。
    4 代码实现def wordcloud(): """ 背景图片为自定义的一个矩阵 :return: 词云图 """ # 读取词源文件 二进制的形式 with open("./govreport.txt", "rb") as f: t = f.read() # 保存为str类型 ls = jieba.lcut(t) # 进行分词 txt = " ".join(ls) # 把分词用空格连起来 # 设置词云的参数 w = WordCloud( font_path="msyh.ttc", # 设置字体 width=1000, # 设置输出的图片宽度 height=700, # 设置输出的图片的高度 background_color="white", # 设置输出图片的背景色 ) w.generate(txt) # 生成词云 w.to_file("./wordColud.png") # 将图片保存 return Nonedef wordcloud2(): """ 用指定的图片生成词云图 :return: 词云图 """ # 词源的文本文件 wf = "./govreport.txt" word_content = open(wf, "r", encoding="utf-8").read().replace("\n", "") # 设置背景图片 img_file = "./map.jpg" # 解析背景图片 mask_img = imread(img_file) # 进行分词 word_cut = jieba.lcut(word_content) # 把分词用空格连起来 word_cut_join = " ".join(word_cut) # 设置词云参数 wc = WordCloud( font_path="SIMYOU.TTF", # 设置字体 max_words=2000, # 允许最大的词汇量 max_font_size=90, # 设置最大号字体的大小 mask=mask_img, # 设置使用的背景图片,这个参数不为空时,width和height会被忽略 background_color="white", # 设置输出的图片背景色 ) # 生成词云 wc.generate(word_cut_join) # 用于显示图片,需要配合plt.show()一起使用 plt.imshow(wc) plt.axis("off") # 去掉坐标轴 plt.savefig("./wordcloudWithMap.png") plt.show() return None
    5 效果展示不带背景图片的词云图

    带有中国地图的词云图
    1 留言 2019-04-21 10:18:25 奖励12点积分
  • 使用VS2013实现修改其他程序的图标

    背景之前,写了一个程序,程序中有一个功能就是获取一个EXE程序的图标,然后,把这个图标更改为另一个EXE程序的图标。在网上搜索了很久,都没有找到相关的例子。找到的大都是修改自己程序的图标,或者是使用一个 .ico 的图标文件,去更改指定程序的图标。虽然,这些功能和自己想要开发的功能有些不同,但,也还是具有一定的参考价值的。
    现在,我就对修改指定程序图标的方式进行总结。总结出 3 种修改方式:

    替换的图标存储在自己程序的资源中
    替换的图标以 .ico 图标文件形式提供
    替换的图标是其他一个EXE程序的图标

    现把程序实现的原理以及实现的过程,写成文档分享给大家,方便大家的参考。
    函数介绍FindResource介绍
    函数声明
    HRSRC FindResource( HMODULE hModule, LPCWSTR lpName,LPCWSTR lpType );
    参数

    hModule:处理包含资源的可执行文件的模块。NULL值则指定模块句柄指向操作系统通常情况下创建最近过程的相关位图文件。lpName:指定资源名称。若想了解更多的信息,请参见注意部分。lpType:指定资源类型。若想了解更多的信息,请参见注意部分。作为标准资源类型。这个参数的含义同EnumResLangProc\lpType。
    返回值

    如果函数运行成功,那么返回值为指向被指定资源信息块的句柄。为了获得这些资源,将这个句柄传递给LoadResource函数。如果函数运行失败,则返回值为NULL。若想获得更多错误信息,请调用GetLastError函数。

    SizeofResource 介绍
    函数声明
    DWORD SizeofResource( HMODULE hModule, // module handle HRSRC hResInfo // resource handle );
    参数

    hModule:包合资源的可执行文件模块的句柄。hReslnfo:资源句柄。此句柄必须由函数FindResource或FindResourceEx来创建。
    返回值

    如果函数运行成功,返回值资源的字节数。如果函数运行失败,返回值为零。若想获得更多的错误信息,请调用GetLastError函数。

    LoadResource 介绍
    函数声明
    HGLOBAL LoadResource( HMODULE hModule, // module handle HRSRC hResInfo // resource handle );
    参数

    hModule:处理包合资源的可执行文件的模块句柄。若hModule为NULL,系统从当前过程中的模块中装载资源。hReslnfo:将被装载资源的句柄。它必须由函数FindResource或FindResourceEx创建。
    返回值

    如果函数运行成功,返回值是相关资源的数据的句柄。如果函数运行失败,返回值为NULL。若想获得更多的错误信息,请调用GetLastError函数。

    LockResource 介绍
    函数声明
    LPVOID LockResource( HGLOBAL hResData // handle to resource );
    参数

    hResDate:被装载的资源的句柄。函数LoadResource可以返回这个句柄。
    返回值

    如果被装载的资源被锁住了,返回值是资源第一个字节的指针;否则为NULL。

    BeginUpdateResource 介绍
    该函数返回一个可被UpdateResource函数使用的句柄以便在一个可执行文件中增加、删除或替换资源。
    函数声明
    HANDLE BeginUpdateResource( LPCTSTR pFileName, // executable file name BOOL bDeleteExistingResources // deletion option );
    参数

    pFileName:指向一个表示结束的空字符串指针,它是用来指定用以更新资源的基于32-位可执行文件的文件名。应用程序必须获得访问这个文件的可写权限,并且此文件在当前状态下不能被执行。如果pFileName未被指定完全路径,系统将在当前路径下搜寻此文件。bDeleteExistingResources:说明是否删除pFileName参数指定的现有资源。如果这个参数为TRUE则现有的资源将被删除,而更新可执行文件只包括由UpdateResource函数增加的资源。如果这个参数为FALSE,则更新的可执行文件包括现有的全部资源,除非通过UpdateResource特别说明被删除或是替换的。
    返回值

    如果此函数运行成功,其值将通过使用UpdateResource和EndUpdateResource函数返回一个句柄。如果被指定的文件不是一个可执行文件,或者可执行文件已被装载,或者文件不存在,或是文件不能被打开写入时,则返回值为空。若想获得更多的错误信息,请调用GetLastError函数。

    UpdateResource 介绍
    增加 删除 或替文件中的资源。
    函数声明
    BOOL UpdateResource( HANDLE hUpdate, // update-file handle LPCTSTR lpType, // resource type LPCTSTR lpName, // resource name WORD wLanguage, // language identifier LPVOID lpData, // resource data DWORD cbData // length of resource data );
    参数

    hUpdate:指定更新文件句柄。此句柄由BeginUpdateResource函数返回。lpType:指向说明将被更新的资源类型的字符串,它以NULL为终止符。这个参数可以是一个通过宏MAKENTRESOURCE传递的整数值,含义参见EnumResLangProc\lpType。lpName:指向说明待被更新的资源名称的字符串,它以NULL为终止符。这个参数可以是一个通过宏MAKEINTRESOURCE传递的整数值。wLanguage:指定将被更新资源的语言标识。要了解基本的语言标识符以及由这些标识符组成的字语言标识符的列表,可参见宏MAKELANGID。lpData:指向被插入可执行文件的资源数据的指针。如果资源是预定义类型值之一,那么数据必须是有效且适当排列的。注意这是存储在可执行文件中原始的一进制数据,而不是由Loadlcon,LoadString或其他装载特殊资源函数提供的数据。所有包含字符串、文本的数据必须是Unicode格式;IpData不能指向ANSI数据。如果lpData为NULL,所指定的资源将从可执行文件中被删除。cbData:指定lpData中的资源数据数据大小,以字节计数。
    返回值

    如果函数运行成功,返回值为非零;如果函数运行失败,返回值为零。若想获得更多的错误信息,请调用GetLastError函数。

    EndUpdateResource 介绍
    终止在可执行文件中的资源更新。
    函数声明
    BOOL EndUpdateResource( HANDLE hUpdate, // update-file handle BOOL fDiscard // write option );
    参数

    hUpdate:用于资源更新的句柄。此句柄通过BeginUpdateResource函数返回。fDiscard:用来说明是否向可执行文件中写入资源更新内容。如果此参数为TRUE,则在可执行文件中无变化;如果此参数为FALSE,则在可执行文件中写入变化。
    返回值

    如果函数运行成功,并且通过调用UpdateResource函数指定的不断积聚的资源修正内容被写入指定的可执行文件,那么其返回值为非零。如果函数运行失败,其返回值为零。若想获得更多的错误信息,请调用GetLastError函数。

    程序实现原理有一个知识点大家需要了解的就是:
    EXE程序会默认把资源ID号最小的ICON类型的图标资源作为自己的程序图标。
    也就是说,要想更改EXE程序的图标,只需要把一个图标创建或替换程序中ICON类型的资源ID最小的图标就可以了。这样,程序就会默认把该图标资源作为自己的程序图标,显示出来。
    那么,图标修改的大致原理就是:

    获取替换图标的图标数据的存储地址
    使用 UpdateResource 函数定位出程序 RT_ICON 类型、资源ID为 1 的图标资源,并使用上述的替换图标数据进行创建或者替换

    这样,就成功替换或者创建了一个资源类型是 RT_ICON,而且资源ID为 1 的最小的图标资源。要注意的是,最小的资源ID号是以 1 开始计数,而不是以 0 开始的,这个需要注意。
    那么,对于上面提到的 3 种方式,主要是获取图标数据的方式上的区别而已。
    针对第 1 种方式:替换的图标存储在自己程序的资源中。获取图标数据的原理就是:

    首先,我们先定位到我们程序里的资源,主要是根据“资源类型”和“资源名称”进行定位,获取资源信息块的句柄
    然后,根据上面获取的资源信息块的句柄,把资源加载到我们程序的内存中
    最后,锁定加载到内存中的资源的内存,防止程序的其他操作影响到这块内存,获取图标数据的首地址

    针对第 2 种方式:替换的图标以 .ico 图标文件形式提供。获取图标数据的原理就是:

    首先,以读方式打开文件,并获取文件的大小
    然后,将文件数据内容全部都出去出来,这样便获取到了图标数据

    针对第 3 种方式:替换的图标是其他一个EXE程序的图标。获取图标数据的方法和第 1 种方式的方法类似,原理是:

    首先,我们先把想要提取图标的EXE将在到我们的程序中来,并获取加载句柄
    然后,根据程序加载句柄,定位到程序里的资源,主要是根据“资源类型”和“资源名称”进行定位,获取资源信息块的句柄
    接着,根据上面获取的资源信息块的句柄,把资源加载到我们程序的内存中
    最后,锁定加载到内存中的资源的内存,防止程序的其他操作影响到这块内存,获取图标数据的首地址

    编码实现在这 3 种方式的图标数据获取中,最麻烦,而且最能体现技术含量的就是第 3 中形式。所以,我们以第 3 种实现方式为例,写出实现的代码。对于其余两种,网上有很多相关的例子和参考资料,相信大家根据原理部分应该自己也可以独立写出来,在此就不给出代码了。
    BOOL ChangeIcon(char *pszChangedIconExeFileName, char *pszSrcIconExeFileName){ // 将在其他程序,并获取程序模块句柄 HMODULE hEXE = ::LoadLibrary(pszSrcIconExeFileName); if (NULL == hEXE) { FreeRes_ShowError("LoadLibrary"); return FALSE; } // 获取其他EXE程序图标资源数据 HRSRC hRsrc = ::FindResource(hEXE, (LPCSTR)1, RT_ICON); if (NULL == hRsrc) { FreeRes_ShowError("FindResource"); return FALSE; } // 获取资源大小 DWORD dwSize = ::SizeofResource(hEXE, hRsrc); if (0 >= dwSize) { FreeRes_ShowError("SizeofResource"); return FALSE; } // 加载资源到程序内存 HGLOBAL hGlobal = ::LoadResource(hEXE, hRsrc); if (NULL == hGlobal) { FreeRes_ShowError("LoadResource"); return FALSE; } // 锁定资源内存 LPVOID lpVoid = ::LockResource(hGlobal); if (NULL == lpVoid) { FreeRes_ShowError("LockResource"); return FALSE; } // 开始修改图标 HANDLE hUpdate = ::BeginUpdateResource(pszChangedIconExeFileName, FALSE); if (NULL == hUpdate) { FreeRes_ShowError("BeginUpdateResource"); return FALSE; } // 如果资源ID存在, 则替换资源; 否则创建资源 // 程序把ICON的最小的资源ID作为程序图标, 所以从1开始, 1最小 BOOL bRet = ::UpdateResource(hUpdate, RT_ICON, (LPCSTR)1, LANG_NEUTRAL, lpVoid, dwSize); if (FALSE == bRet) { FreeRes_ShowError("UpdateResource"); return FALSE; } ::EndUpdateResource(hUpdate, FALSE); // 释放模块 ::FreeLibrary(hEXE); return TRUE;}
    程序测试现在,我们在 main 函数中,调用上述封装好的函数进行测试,把 “520.exe” 程序的图标改成 “360.exe” 程序的图标。
    main 函数为:
    int _tmain(int argc, _TCHAR* argv[]){ // 把 520.exe 的图标更改为 360.exe 的图标 if (FALSE == ChangeIcon("C:\\Users\\DemonGan\\Desktop\\520.exe", "C:\\Users\\DemonGan\\Desktop\\360.exe")) { printf("Change Icon Error!\n"); } else { printf("Change Icon OK!\n"); } system("pause"); return 0;}
    在测试之前,“360.exe” 程序和 “520.exe” 程序图标如下图所示:

    运行修改图标的程序,提示执行成功:

    然后,再次查看 “360.exe” 程序和 “520.exe” 程序,发现 “520.exe” 图标成功更改了:

    由此可见,程序测试成功。
    总结改程序的核心原理就是:EXE程序会默认把资源ID号最小的ICON类型的图标资源作为自己的程序图标。
    其中,测试的时候,需要注意的一个问题就是,图标更改成功后,可能会看到图标仍然没有更改。那可能是因为系统缓存了原来程序的图标,并没有更新图标缓存的缘故。最简单的方法就是,把修改图标后的程序,拷贝到其他目录下进行查看,这样操作比较简便些。
    参考参考自《Windows黑客编程技术详解》一书
    2 留言 2018-11-19 09:19:19 奖励8点积分
  • C语言-基于Huffman编码原理的译码解压缩程序

    C语言实现基于Huffman编码原理的译码解压缩程序
    huffman编码原理这里不做介绍,梳理一下解码的代码实现。
    解码解压缩部分输入已编码文本和编码表,输出解码文本。

    1.获取已编码文本void readfile(int op){ FILE* fp; int text_i = 0; char tempc; fp = fopen(".\\outfile.txt", "r"); if (fp != NULL) { while (fscanf(fp, "%c", &tempc) != EOF) { de_str_text[text_i] = tempc; text_i++; } de_str_text[text_i] = '\0'; fclose(fp); } else { printf("文件读取失败!"); getchar(); exit(0); }}2.获取码表并生成huffman树void bulid_detree(){ FILE *fp; de_root = (Hufftree *)malloc(sizeof(Hufftree)); Hufftree* p = de_root; Hufftree* t; int tag = 0; de_root->lchild = de_root->rchild = NULL; int i = 0, j = 1, k = 0; fp = fopen(".\\mabiao.txt", "r"); if (fp) { for (i = 0; i < charmaxsize; i++) { if ((fgets(mabiao[i], codesize, fp)) == NULL) { break; } else if (tag) { for (k = 0; k < codesize - 1; k++) { mabiao[i][codesize - 1 - k] = mabiao[i][codesize - 2 - k]; } mabiao[i][0] = mabiao[i - 1][0]; tag = 0; } if (mabiao[i][1] == '\0') { tag = 1; } } fclose(fp); } else { printf("码表读取失败"); return; } i = 0; while (mabiao[i][0] != '\0') { p = de_root; while (mabiao[i][j] != '\0') { if (mabiao[i][j] == '1') { if (p->rchild == NULL) { t = (Hufftree*)malloc(sizeof(Hufftree)); t->lchild = t->rchild = NULL; p->rchild = t; p = p->rchild; } else { p = p->rchild; } } if (mabiao[i][j] == '0') { if (p->lchild == NULL) { t = (Hufftree*)malloc(sizeof(Hufftree)); t->lchild = t->rchild = NULL; p->lchild = t; p = p->lchild; } else { p = p->lchild; } } j++; } p->character = mabiao[i][0]; j = 1; i++; } }3.已编码文本转化成伪二进制流void ConvertoBit(){ int bit_i = 0, str_i = 0,i = 0; char temp[7]; int asc; Hufftree *seek; //转化为伪二进制数组 for (str_i = 0; str_i < strmaxsize; str_i++) { if (de_str_text[str_i] != NULL) { asc = (int)de_str_text[str_i]-31; for (i = 0; i < 6; i++) { if (asc % 2 == 0) { temp[5 - i] = '0'; } else { temp[5 - i] = '1'; } asc = asc / 2; } temp[6] = '\0'; strcat(de_strbit_temp, temp); } else { break; } } puts(de_strbit_temp);}4.根据伪二进制流查找huffman树输出字符void decode(){//按照码表转换成原文本 seek = Root; str_i = 0; while (de_strbit_temp[bit_i] != NULL) { if (seek->character != NULL) { de_strcode[str_i] = seek->character; str_i++; seek = Root; } else if (de_strbit_temp[bit_i] == '1') { seek = seek->rchild; bit_i++; } else { seek = seek->lchild; bit_i++; } } de_strcode[str_i] ='\0';}输出解码文档void outfile(int op){ int i = 0; FILE* fp; fp = fopen(".\\decodefile.txt", "w"); fputs(de_strcode, fp); fclose(fp);}至此完成解码部分tip由于解码时,伪二进制流不足6位时会补零可能会导致出现多余字符。
    附:结构体定义//字符编码信息typedef struct Huff_char { char character = 0; int count = 0; char code[20];}Huff_char;//字符队列typedef struct Huffchar { char character = 0; int count = 0;//统计词频 struct Hufftree* self = NULL;}Huffchar;//哈夫曼树节点typedef struct Hufftree { char character;//节点字符信息 int power;//权值 struct Hufftree* self, *rchild, *lchild;//自身地址,左右孩子}Hufftree;
    1 留言 2018-11-09 21:04:01 奖励5点积分
  • 上传资源,获取积分 精华

    上传资源,获取积分“WRITE-BUG技术共享平台”是一个专注于校园计算机技术交流共享的平台,面向的主要目标群体是我们计算机相关专业的大学生。在平台上,大家既可以交流学校课内学习的心得体会,也可以分享自己课外积累的技术经验。
    为了充实平台的资源库,更好地服务于各位同学,平台决定推出“众源计划”,有偿征集同学们自己计算机专业的作业、课程设计或是毕业设计等资源。“众源计划”的主要目的是创建一个具有一定规模的“技术资源库”,资源库里的每一份资源,都必须有详细的开发文档和可编译的源代码。
    作业、课程设计或是毕业设计等资源是同学们自己辛苦付出的成果,也是自己技术进步的见证。这部分资源通常都有详细的开发文档和完整的程序源代码,能够帮助其他初学者更好地消化和吸收将要学习的技术,降低学习门槛。所以,平台决定积分奖励征集这些资源。
    具体要求活动对象
    在校或者已毕业的计算机相关专业大学生,院校不限
    奖励方式
    资源上传并审核通过后,根据资源质量,奖励每贴 10 - 100 点积分
    上传流程
    会员登录自己的账号上传资源
    资源上传后,管理员会在 24 小时之内审核资源
    审核通过后,管理员会立即发放奖励积分至所对应账户

    审核重点
    重点审核资源是否具有详细的文档和完整的源代码
    审查资源是否原创,切勿重复提交

    资源要求“众源计划”仅对两类资源进行积分奖励征集,分别是“课内资源”和“课外资源”,各类资源具体要求如下所示。

    课内资源

    内容范围:计算机相关专业课内的毕业设计、课程设计、小学期、大作业等课程内开发的程序,程序包括游戏、PC程序、APP、网站或者其他软件形式
    内容要求:资源必须要包括完整的程序源代码和详细的开发文档或报告
    具体“课内资源”征集程序列表见附录一

    课外资源

    内容范围:计算机相关专业的课外自己主导研究游戏、项目、竞赛、个人研究等,区别于课程设计和毕业设计等课内资源
    内容要求:资源必须要包括完整的程序源代码和详细的开发文档或报告
    具体“课外资源”征集程序列表见附录二


    附录一注意:“众源计划”的题目范围包括且不限于以下题目

    汇编语言课程设计题目列表

    屏幕保护程序分类统计字符个数计算机钢琴程序字符图形程序音乐盒程序电子闹钟程序俄罗斯方块打字游戏图形变换程序吃豆子程序其他
    C语言课程设计题目列表

    学生成绩管理系统图书信息管理系统设计销售管理管理系统飞机订票管理系统教师评价系统学校运动会管理系统文本文件加密技术英语字典电话簿管理系统流星雨的实现其他
    C++语言课程设计题目列表

    学生学籍管理系统高校人员信息管理系统学生成绩管理系统车辆管理系统职工工作量统计系统学生考勤管理系统单项选择题标准化考试系统图书管理系统超市商品管理系统模拟ATM机存取款管理系统其他
    JAVA语言课程设计题目列表

    简单投票管理系统数学练习题目自动生成系统华容道小游戏电子英汉词典加密与解密标准化考试系统排球比赛计分系统学籍管理系统绘图系统图书信息管理系统其他
    C#语言课程设计题目列表

    学生信息管理系统学生综合测评系统图书管理系统学校运动会管理系统个人通讯录管理系统教师工资管理系统教师工作量管理系统趣味小游戏物资库存管理系统图形图像处理系统其他
    JSP语言课程设计题目列表

    微博系统基于web的学生信息管理系统在线计算机等级考试报名系统在线问卷调查系统网上销售系统论坛系统图书借阅管理系统网上购物系统工资管理系统酒店管理系统其他
    数据结构与算法课程设计题目列表

    设计哈希表实现电话号码查询系统电报压缩/解压缩系统电费核算系统机房计费管理系统公交线路查询系统用二叉平衡树实现图书管理系统运动会赛事安排动态表达式求值用线性结构实现学生成绩管理求解迷宫问题其他
    编译原理课程设计题目列表

    First集和Follow集生成算法模拟LL(1)分析过程模拟FirstVT集和LastVT集生成算法模拟算符优先分析表生成模拟算符优先分析过程模拟LR分析过程模拟PL/0语言的词法分析程序C语言的预处理程序自动机的状态转换图表示数组越界检查工具其他
    操作系统课程设计题目列表

    动态分区分配方式的模拟进程调度模拟算法请求调页存储管理方式的模拟P、V操作及进程同步的实现银行家算法SPOOLING假脱机输出的模拟程序文件系统设计动态不等长存储资源分配算法磁盘调度算法处理机调度算法模拟其他
    数据库课程设计题目列表

    高校学籍管理系统在线投稿审稿管理系统产品销售管理系统高校人力资源管理系统高校课程管理系统酒店客房管理系统报刊订阅管理系统医药销售管理系统学生学籍管理系统餐饮管理系统其他
    计算机网络课程设计题目列表

    TCP通信功能实现网络游戏的开发基于UDP协议网上聊天程序Ping 程序的实现数据包的捕获与分析FTP客户端设计包过滤防火墙的设计与实现简单的端口扫描器简单Web服务器的设计与实现HTTP客户端的设计与实现其他
    软件工程课程设计题目列表

    学校教材订购系统网上选课管理系统简易办公系统图书馆管理系统校园交流论坛网站超市收银系统ATM柜员机模拟程序企业办公自动化管理系统学生成绩管理系统进销存管理系统其他
    VC++程序设计课程设计题目列表

    模拟时钟程序单向链表的操作演示程序电影院售票系统俄罗斯方块五子棋24点游戏背单词软件的设计与实现酒店管理系统餐厅就餐管理系统吹泡泡游戏其他
    其他课程设计

    PHP语言课程设计PYTHON语言课程设计计算机图形学课程设计机器学习课程设计密码学课程设计其他

    附录二注意:“众源计划”的题目范围包括且不限于以下题目

    人脸识别系统车牌识别系统旅游自助APP疲劳驾驶识别检测系统考试管理系统WINDOWS驱动级安全防御系统WINDOWS平台逆向调试器坦克大战小游戏情感分析系统人机博弈的国际象棋游戏其他
    最终解释权归 WRITE-BUG技术共享平台 所有
    11 留言 2018-12-20 10:09:45 奖励100点积分
  • 基于Python使用TensorBoard可视化工具

    首先展示一下代码:
    writer = tf.summary.FileWriter("./summary/linear-regression-0/", sess.graph)
    我们将数据流图的事件保存到当前目录下的/summary/linear-regression-0/中,记得后面关闭一下 writer.close(),我们来看一下运行后的结果。

    那么我们现在需要如何显示呢?写这个目的就是告诉到家是如何使用 TensorBoard 可视化的,步骤如下:

    Windows+R 打开 Terminal
    cd 到 summary 目录(注意不是 cd 到 linear-regression-0 目录)
    使用 TensorBoard —logdir=linear-regression-0

    我们来看一下图应该就能明白:

    如果打不开,中间的 DESKTOP-A0JMDP5 换成 localhost 试试!好了!现在让我们输入网址试试:
    1 留言 2019-04-20 12:26:23 奖励12点积分
  • 线程同步之临界区

    背景在学校学习《操作系统》这门课程的时候,进程线程的调度同步都是比较重要的知识点。关于线程的同步方法有很多种,例如互斥量、信号量、事件、临界区等。
    本文就从编程实现的角度谈谈编程中临界区在多线程同步中的使用方式,现在,把实现的思路和过程写成文档,分享给大家。
    实现原理有多个线程试图同时访问临界区,那么在有一个线程进入后其他所有试图访问此临界区的线程将被挂起,并一直持续到进入临界区的线程离开。临界区在被释放后,其他线程可以继续抢占,并以此达到用原子方式操作共享资源的目的。
    临界区在使用时以CRITICAL_SECTION结构对象保护共享资源,并分别用WIN32函数EnterCriticalSection 和 LeaveCriticalSection 去进入和离开一个临界区。
    所用到的CRITICAL_SECTION结构对象必须要事先经过 InitializeCriticalSection 的初始化后才能使用,而且必须确保所有线程中的任何试图访问此共享资源的代码都处在此临界区的保护之下。否则临界区将不会起到应有的作用,共享资源依然有被破坏的可能。
    临界区使用完毕之后,要使用 DeleteCriticalSection 函数删除临界区,释放资源。
    编码实现为了更好理解临界区的概念,我们开发了这样一个程序:开启 3 个线程,每个线程都循环打印显示同一个整型全局变量,并将它自增1。要求,每个数字只显示一遍,而且是按 1 递增显示。
    如果我们什么操作不加的话,直接创建 3 个多线程去打印显示的话,代码如下:
    UINT ThreadProc(LPVOID lpVoid){ while (TRUE) { printf("%d\n", g_iCount); g_iCount++; if (g_iCount > iStop) { break; } } return 0;}BOOL CriticalSectionTest(){ // 多线程1 HANDLE hThread1 = ::CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ThreadProc, NULL, 0, NULL); // 多线程2 HANDLE hThread2 = ::CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ThreadProc, NULL, 0, NULL); // 多线程3 HANDLE hThread3 = ::CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ThreadProc, NULL, 0, NULL); // 等待线程结束 ::WaitForSingleObject(hThread1, INFINITE); ::WaitForSingleObject(hThread2, INFINITE); ::WaitForSingleObject(hThread3, INFINITE); // 关闭句柄 ::CloseHandle(hThread1); ::CloseHandle(hThread2); ::CloseHandle(hThread3); return TRUE;}
    那么,上面程序的显示结果如下图所示:

    可以看到,输出的每个数字并不是都唯一,有些都重复输出,而且并不是按递增 1 的规律输出的。原因细想一下应该就明白了,创建了 3 个多线程,有可能会出现当线程1执行完打印输出的时候,线程2也执行到打印输出,这是全局变量g_iCount值还是0,所以便会重复输出。而它们也会同时执行g_iCount++,那么之后的输出就不会按递增1进行输出了。
    所以,为了每次输出和全局变量递增的时候,只能有一个线程去的独占执行,这时就需要临界区的帮助了。
    加入临界区后的代码就变成下面这样子:
    UINT ThreadProc(LPVOID lpVoid){ while (TRUE) { // 进入临界区 ::EnterCriticalSection(&g_cs); printf("%d\n", g_iCount); g_iCount++; if (g_iCount > iStop) { break; } // 离开临界区 ::LeaveCriticalSection(&g_cs); } return 0;}BOOL CriticalSectionTest(){ // 初始化临界区 ::InitializeCriticalSection(&g_cs); // 多线程1 HANDLE hThread1 = ::CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ThreadProc, NULL, 0, NULL); // 多线程2 HANDLE hThread2 = ::CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ThreadProc, NULL, 0, NULL); // 多线程3 HANDLE hThread3 = ::CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ThreadProc, NULL, 0, NULL); // 等待线程结束 ::WaitForSingleObject(hThread1, INFINITE); ::WaitForSingleObject(hThread2, INFINITE); ::WaitForSingleObject(hThread3, INFINITE); // 关闭句柄 ::CloseHandle(hThread1); ::CloseHandle(hThread2); ::CloseHandle(hThread3); // 删除临界区 ::DeleteCriticalSection(&g_cs); return TRUE;}
    代码的执行效果如下所示:

    这时,输出满足要求了,每个数字只显示 1 遍,而且是递增 1 输出。也就说明临界区起到了同步的效果,每次显示和递增,都保证了只能一个线程执行这部分的代码。
    总结在编程开发中,使用临界区之前一定记得要进行初始化操作,使用完毕后,要删除临界区。
    2 留言 2018-11-07 10:59:10 奖励5点积分
  • 使用VS2013编译Crypto++加密库

    背景近期写了一个关于AES和RSA加解密的小程序,其中加解密模块使用开源的Crypto++加密库写的。在程序使用Crypto++加密库之前,需要下载Crytpo++加密库的源码到本地,自己编译,得到库文件。
    当时的最新版Crypto++版本是5.6.5版本,支持到VS2010开发环境编译,所以,我使用VS2013去编译的话,还需要项目进行一些编译的设置。
    现把当时编译的过程详细描述出来,形成文档,方便有需要的人们参考。
    准备工作首先,你要有个VS2013开发环境。因为本文讲解的是使用VS2013开发环境进行编译的,理论上VS2010、VS2012、VS2015、VS2017等应该也适用的。
    其次,需要到Crypto++官网 (https://www.cryptopp.com) 下载Crypto++库的源码。本文以“Crypto++ 5.6.5”版本为例进行讲解。
    编译过程1. 升级解决方案将“Crypto++ 5.6.5”版本Crypto++库源码下载下来后,进行解压缩。使用VS2013开发环境打开“cryptest.sln”解决方案文件。VS2013提示说“升级VC++编译器和库”,这时我们点击“确定”。因为Crypto++项目工程原来使用VS2010开发的,现在我们使用VS2013重新进行编译,所以要对项目进行升级。

    升级完成后,我们可以在左侧看到项目列表,其中“cryptlib”项目工程就是我们将要进行编译的项目工程。

    2. 更改项目工程平台工具集选中要编译的“crytplib”项目工程,鼠标右击,选中“属性”,打开“属性页”。然后,在“平台工具集”的选项中,选择“Visual Studio 2013 - Windows XP (v120_xp)”,表示兼容XP平台,也就是在XP系统下,调用此Crypto++库文件,也能正常运行。

    3. 重新生成在设置完成之后,右击“cryptlib”项目工程,选择“重新生成(E)”。

    这样,就可以在生成目录下找到生成的库文件了。

    我们可以选择不同的编译模式:Debug模式或是Release模式,选择不同的位数:Win32或是x64,来生成我们所需的库文件。
    4. 加载到程序中我们编译得到库文件“cryptlib.lib”库文件之后,然后,把程序所用加密算法的头文件如:AES加密算法所需的头文件“aes.h”、RSA加密算法所需的头文件“rsa.h”、“randpool.h”、“hex.h”、“files.h”;连同库文件“cryptlib.lib”文件一起拷贝到我们自己的程序目录下,并导入到程序中。这样,我们就可以直接调用Crypto++加密库的函数了。

    总结步骤不是很复杂,要注意一点的是,注意编译的时候,“运行库”对应的设置,这样在自己程序调用编译好的Crypto++库的时候,也要选择相应的模式,进行相应的“运行库”设置。
    参考参考自《Windows黑客编程技术详解》一书
    2 留言 2018-11-06 22:04:18 奖励3点积分
  • 大数据自然语言处理12、NLP

    前文链接:https://write-bug.com/article/2367.html
    文本相似度一般在讨论相似度时,我们讨论相似度与距离之间的关系:相似度越低,距离越远
    我们这里先分为两类:
    语义相似:

    个人简介
    人物介绍

    解决方案:协同过滤(用户行为)、回归算法
    eg:用户搜索歌神时,大部分人点击张学友,由此两个词间便存在了某种联系
    字面相似:

    我吃饱饭了
    我吃不饱饭

    解决方案:LCS最长公共子序列、中文分词(词:token)
    eg:通过token列出词粒度集合,通过算法打分或者取交集求取相似度
    解决字面相似:
    1.cosine余弦相似度
    以前在中学学习余弦时都是基于二维坐标系:
    cos(θ)=a·b/|a|* |b|我们词粒度的维度是由token数量决定的,由此我们拓展到n维得:

    • 句子A:(1,1,2,1,1,1,0,0,0)
    • 句子B:(1,1,1,0,1,1,1,1,1)

    a,b即通过分词后得到的词粒度集合向量化
    注意:由分词得到的去重集合(BOW bag of word)一旦列举出后顺序不能变
    计算流程:

    通过TFIDF找出两篇文章的关键词
    每篇文章各取出若干个关键词,合并成一个集合,计算每篇文章对于这个集合中的 词的词频
    生成两篇文章各自的词频向量
    计算两个向量的余弦相似度,值越大就表示越相似

    TFIDF:找关键词
    1)TF(Term Frequency):词频
    即假设如果一个词很重要,那么它在这篇文章中会多次出现。但是,出现最多的不一定是最重要的,比如停用词:“的”“是”“在”。。。(可提前过滤)那么再次假设关键词:在当前文章出现较多,但在其他文章出现少。
    公式:
    TF1=某词在文章中出现次数 / 文章词总数(基数大) 越大越重要TF2=某词在文章中出现次数 / 该文章次数最多的词的出现的次数(基数小)TF越大,越重要
    2)IDF:反文档频率,即某一个词在某些文章中出现个数多少的衡量
    公式:
    log(语库文档总数 / 包含该词的文档数+1) > 0单调函数,x越大,y越平滑。出现少,分母越小,IDF越大,词越重要。
    score= TF*IDF像停用词这样的词,TF大,IDF小
    拓展:自动摘要
    1)确定关键词集合
    两种方法:
    1.score—Top10
    2.阈值截断>0.8
    2)那些句子包含关键词,取出这些句子
    3)对关键词排序,对句子做等级划分(含该关键词的分数线性加和)
    4)把等级高的句子取出来,就是摘要
    优化:关键词算法、中心思想、第一末尾自然段等等。
    tfidf:

    优点:简单快速,结果比较符合实际状况
    缺点:缺少通顺度,而且关键词不是唯一的权重,比如开头结尾,中心思想。

    单纯以“词频”做衡量标准,不够全面,有时重要的词可能出现的次 数并不多
    实践:
    1.分词
    2.数据预处理,把所有文章内容全部收集到一个文件中,每行为一篇文章
    convert.py input_tfidf_dir/ >convert.data3.MR批量计算IDF

    map.py: word list去重,每一个单词在一篇文章中出现一次计1
    reduce.py: wordcount + IDF计算

    得到token,score集合
    排序:cat part-00000 |sort -k2 -nr |head
    2.LCS 最长公共子序列:顺序性,不连续
    从字面的角度,衡量字面相似度的方法之一
    应用:

    推荐场景 , item推荐,推荐列表—-保持多样性,满足新鲜感需求,放在推荐引擎中可方便调控相似度阈值过滤(即候选集合后)
    辨别抄袭
    生物学家常利用该算法进行基因序列比对,以推测序列的结构、功能和演化过程

    – 字符串12455与245576的最长公共子序列为2455
    – 字符串acdfg与adfc的最长公共子序列为adf
    计算方法:
    暴力穷举:abc排列组合,取LCS,穷举搜索法时间复杂度O(2^m ∗ 2^n);
    动态规划法:
    抽象公式:

    当xy两字符串最后一位相等时+1,并循环求lcs,不相等时求左右各减一位的最大值,如果有一方为空循环停止,值为0
    代码二维数组:

    i和j为行和列,从上到下从左到右看,最开始都为空,行列都为0,接着从左到右有相同项左上角+1,不相同的左和上取最大值,最后得出最大序列数
    score=4len(x)=7len(y)=6sim(x,y)=4*2/6+7=0.615*2为了归一化实践:
    通过数组维护tokenlist,左右两个字符串按照上面形式计算。
    lcs适合粗粒度过滤,适合场景没有cos广泛因为机器学习中涉及到cos的地方太多了
    3.中文分词中文并不像英文,每个单词自动空格分隔,并且没有一个衡量标准好坏问题,词粒度分割不同,意思也就不同。
    我们前面一直提到搜索和推荐场景,在这里,推荐适合粗粒度的分词,因为需要关心词的语义去推荐,而搜索适合细粒度的分词场景,侧重于召回更多的item到候选集合。
    表示方法:
    那我们如果对于一句话做分词,人类可以一眼就看出来,那么对于计算机来说怎么表示,那我们假设,切开位置表示为1,其他位置为0;那么这句话表示为:有/意见/分歧——》11010本田雅阁/汽车——》100010那么这种表示方法对计算机确实很友好,但是对我们来说很头疼,于是通常情况下,我们用分词节点序列表示:有/意见/分歧——》{0,1,3,5}
    那么接下来的问题,这串索引怎么得到呢?这里有一种原始方法:最大长度查找(字典匹配)这个又分两种:前向查找,后向查找。加载词典:
    在查找过程中我们如果一个词一个词去匹配未免过于耗费资源,所以这里用一个加速查找字典的方法:trie树—数据结构 查找过程如下:• 从根结点开始一次搜索;• 取得要查找关键词的第一个字母,并根据该字母选择对应的子树并转到该子树继续进行检索;• 在相应的子树上,取得要查找关键词的第二个字母,并进一步选择对应的子树进行检索。• 迭代过程……• 在某个结点处,关键词的所有字母已被取出,则读取附在该结点上的信息,即完成查找。其他操作类似处理.时间复杂度 O(n)—-空间换时间数据结构,并且在python中的字典数据结构这种速度是近乎于O(1)
    匹配词典:
    前面这是加载词典,而匹配词典是下面几种方法:1、最大长度查找前向查找:先从头匹配第一个字,之后看第二个字是否在第一个字后面,以此类推,若不在则砍一刀,之后继续查找当前第一个字。后向查找:即语料库词为倒序,即反向建库通常情况下后向查找会更符合语义。2、我们把每个字都分开,之后在语料库中匹配出每种排列组合的可能,用前面说的索引序列表示,与此同时,我们把每种可能都用一条线首尾相连,而这么多条线就构成了一张图:即有向无环图,也叫DAG图。
    由此一句话的编码表示:DAG: {0: [0, 1, 3], 1: [1], 2: [2, 3, 5], 3: [3], 4: [4, 5], 5: [5], 6: [6, 7], 7: [7]}
    那么我们这些边该如何去切分和选择呢?
    由此引出:语言模型概念
    我们由语料库已经有了很多列举后的切分方案,那么我们由这些切分方案可以得到每条方案的条件概率,比如:C=本田雅阁汽车S1=本田/雅阁/汽车S2=本田雅阁/汽车则:P(S1|C)表示S1这条切分方案的条件概率
    目标:我们希望得到的就是最大概率的那条切分方案。
    那我们如何求取这条概率呢?:贝叶斯公式
    p(s|c)=p(c|s)p(s)/p(c)
    推导:p(s|c) p(c)=p(s,c)—联合概率(同时发生的概率)->p(c|s)p(s)=p(c,s)-> p(s|c) p(c)= p(c|s)p(s)-> p(s|c)= p(c|s)p(s)/p(c)
    P(c):即这句话的概率,但是人类说每句话的概率都是相同的,即常量。所以有:p(s|c)= p(c|s)p(s)P(c|s):即p(本田雅阁汽车|本田/雅阁/汽车)=100%,即不管哪种切分方案都可以还原出原始的句子。所以有:p(s|c)= p(s)目标:P(S)独立性假设,每个词之间无任何联系,都是独立的那么有:p(本田/雅阁/汽车)=p(本田)p(雅阁)p(汽车)也就是每个词token的概率:即TF——word count由于P(S1)>p(S2),所以选P(S1)
    那么由此我们选择概率最大的那条路线,就是这条句子的切分最佳方案。
    但是,我们概率相乘是很容易形成很小的小数的,造成向下溢出,所以有:
    求log后因为概率是小于1的数所以为负数,可比性高,并且加法运行速度比乘法更快。
    我们前面是假设每个token前后是无关联关系的,但在实际生活中词与词都是有关联的,所以我们刚才的概率模型就是一元模型(只考虑到了词频概率)。
    多元模型:一元模型(Unigram) :一个词的出现不依赖于它前面出现的词—3个参数P(S)=P(w1) P(w2)P(w3)…P(wn)二元模型(Bigram):一个词的出现仅依赖于它前面出现的一个词—3+6个参数P(S)=P(w1) P(w2|w1)P(w3|w2)…P(wn|wn-1) 三元模型 (Trigram):简化成一个词的出现仅依赖于它前面出现的两个词。3+6+。。
    每次参数排列组合爆炸级增长。
    J i e b a 分词简介官方链接:https://github.com/fxsjy/jiebajieba用于中文分词,支持的文本编码格式为utf-8,支持的功能包括:中文分词、关键字提取、词性标注整体功能如下图:
    框架结构:
    第一阶段:加载词库,用trie树分割,生成句子中汉字所有可能成词情况所构成的DAG图—-FULL
    词表格式: token,TF,词性 ——-》trie树——>DAG图——》找概率Route概率:获得词频最大切分P= TF / total(TF总和)—》log(p)—-》负数总词数:Linux:awk汉字计数:cat dict.txt | awk ‘BEGIN{a=0}{a+=$2}END{print a}’—— 60101967(total)
    第二阶段:动态规划或者贝叶斯计算最大路径概率,找出最大切分组合,Token英文中文识别我们前面得到了每条边和每个汉字的概率(即所有可能词的概率),接着用倒序索引找最大路径(类似开始说的第二种表示方法)route: {0: (-33.271126717488308, 1),1: (-32.231489259807965, 1),2: (-23.899234625632083, 5),3: (-31.52324681384394, 3),4: (-22.214895405024865, 5),5: (-19.00846787368323, 5),6: (-8.7800749471799175, 7),7: (-8.800692190498415, 7), 8: (0.0, ‘’)}从后向前导,每种切分方案是单个字概率大还是词概率大,确定后的方案概率不变,继续向前导,求最大方案,并累加概率值,以此类推。所以最后方案:0-1,2-5,6-7第三阶段:所有单个汉字的传给——>HMM模型粘贴词库中没有的被切碎的词(未登录词)—->再传回来改回为词更新句子,Viterbi动态规划算法HMM:隐马尔科夫模型
    实践:

    MR批量分词Client本地:代码、jieba包Datanode:数据文件分发—file模式—-代码内解压
    2.pyweb+分词Get、Post引用jieba方法
    1 留言 2019-04-12 12:23:32 奖励11点积分
  • ring0下通过OUT指令实现强制关机和重启

    背景通常情况下,我们可以通过调用 NtShutdownSystem 未导出的内核 API 函数,来在内核层下实现计算机的关机和重启等操作。 这种实现方式虽然在内核层下实现,但是相对来说,还是太过于表层了。
    本文要介绍一种在内核层下实现的强制关机以及重启的方法,即通过汇编 OUT 指令,控制端口输出,从而实现计算机的关机和重启。这种方式,较为低层,而且更难防御和监控。现在,我就把程序实现过程和原理,整理成文档,分享给大家。
    实现原理我们通过汇编 OUT 指令,控制端口的输出,从而实现控制计算机关机或者重启。
    对于实现关机操作,则可以向 1004H 端口,写入 2001H,汇编代码为:
    mov ax, 2001h mov dx, 1004h out dx,ax ret
    则,对应的机器码为:
    {0x66,0xB8,0x01,0x20,0x66,0xBA,0x04,0x10,0x66,0xEF,0xC3}
    对于实现重启操作,则可以向 64H 端口,写入 0FEH,汇编代码为:
    mov al, 0FEh out 64h, al ret
    则,对应的机器码为:
    {0xB0,0xFE,0xE6,0x64,0xC3}
    所以,我们可以通过申请一块非分页内存,写入 Shellcode 数据,并声明函数指针,调用 Shellcode 数据并执行,从而实现相应的关机或者重启操作。当然,我们也可以直接写成汇编代码,但是,要注意 64 位系统下汇编代码要保存为 .asm 汇编文件,添加到工程中。
    编码实现强制关机// 强制关机BOOLEAN ShutdownForce(){// {0x66, 0xB8, 0x01, 0x20, 0x66, 0xBA, 0x04, 0x10, 0x66, 0xEF, 0xC3} UCHAR ShellCode[11] = { 0x66, 0xB8, 0x01, 0x20, 0x66, 0xBA, 0x04, 0x10, 0x66, 0xEF, 0xC3 }; ULONG ulShellcodeSize = 11;#ifdef _WIN64 // 64 位 typedef VOID(__fastcall *typedef_SHUTDOWNFUNC)();#else // 32 位 typedef VOID(__stdcall *typedef_SHUTDOWNFUNC)();#endif // 申请内存 typedef_SHUTDOWNFUNC ShutdownFunc = (typedef_SHUTDOWNFUNC)ExAllocatePool(NonPagedPool, ulShellcodeSize); if (NULL == ShutdownFunc) { DbgPrint("ExAllocatePool Error!\n"); return FALSE; } // 写入数据 RtlCopyMemory(ShutdownFunc, ShellCode, ulShellcodeSize); // 执行Shellcode代码实现关机 ShutdownFunc(); // 释放内存 ExFreePool(ShutdownFunc); return TRUE;}
    强制重启// 强制重启BOOLEAN RebootForce(){ // {0xB0, 0xFE, 0xE6, 0x64, 0xC3} UCHAR ShellCode[11] = { 0xB0, 0xFE, 0xE6, 0x64, 0xC3 }; ULONG ulShellcodeSize = 5;#ifdef _WIN64 // 64 位 typedef VOID(__fastcall *typedef_REBOOTFUNC)();#else // 32 位 typedef VOID(__stdcall *typedef_REBOOTFUNC)();#endif // 申请内存 typedef_REBOOTFUNC RebootFunc = (typedef_REBOOTFUNC)ExAllocatePool(NonPagedPool, ulShellcodeSize); if (NULL == RebootFunc) { DbgPrint("ExAllocatePool Error!\n"); return FALSE; } // 写入数据 RtlCopyMemory(RebootFunc, ShellCode, ulShellcodeSize); // 执行Shellcode代码实现重启 RebootFunc(); // 释放内存 ExFreePool(RebootFunc); return TRUE;}
    程序测试在 Win7 32 位系统下,驱动程序正常运行;在 Win10 64 位系统下,驱动程序正常运行。
    总结对于这个层序,虽然编码较为简单,但是要想兼容 32 位和 64 位计算机,则需要特别注意一个问题:
    在声明函数指针的时候,对于 32 位函数,调用约定为 __stdcall;对于 64 位函数,调用约定为 __fastcall。
    所以,一定要注意不同系统位数下,函数的调用约定声明。否则,会出现错误。
    1 留言 2019-04-15 16:24:29
  • 《Windows黑客编程技术详解》勘误——13.3 文件管理之NTFS解析

    在内核篇第13章的第3小节“文件管理之NTFS解析”一文中,由于有网友测试反馈说该小结代码偶尔会出现定位不到存在的文件问题!经过我的排查,是由于配套代码中,多个Data Run的处理逻辑有问题!
    现在为了方便大家理解修改后的源代码,我在此补充下多个Data Run的处理说明。
    在NTFS文件系统中,如果数据很大的话,通常会使用Data Run来记录开辟的新空间,而且这些数据可能会不连续,所以就会出现多个Data Run的情况。
    Data Run的含义分析如下:

    Data Run的第一个字节分高4位和低4位。其中,高4位表示文件内容的起始簇号在Data Run List中占用的字节数。低4位表示文件内容簇数在Data Run List中占用的字节数。
    Data Run的第二个字节开始表示文件内容的簇数,接着表示文件内容的起始簇号。
    Data Run可以指示空间的大小以及偏移位置,例如上述中给的例子,起始簇号为:A2 59 00(10639616),数据大小为:C0 14(49172)。
    现在需要补充的知识点是针对多个Data Run的,如下所示!!!
    对于多个Data Run的情况,第一个Data Run的起始簇号是一个正整数,而第二个Data Run开始,起始簇号偏移是分正负的。可以根据起始簇号偏移的最高位是否来判断,若为1,则是负整数(补码表示);否则,是正整数。而且,从第二个Data Run开始,起始簇号偏移都是相对于上一个Data Run的起始簇号来说的。下面举个例子,方便大家理解。
    例如,有这么一个Data Run如下所示:
    31 01 FD 0A 28 21 01 AB FA 21 01 4A F5 21 01 91 C1 00我们可以看到上面一共有4个Data Run,分别如下:
    第1个Data Run

    31 01 FD 0A 28
    正整数:第一个Data Run的起始簇号都是正整数起始簇号:28 0A FD(2624253)

    第2个Data Run

    21 01 AB FA
    负整数:起始簇号偏移FA AB的最高位是1,所以是负整数(补码),所以FA AB(-1365)起始簇号:相对于上一个Data Run的偏移,所以为:2624253-1365=2622888

    第3个Data Run

    21 01 4A F5
    负整数:起始簇号偏移F5 4A的最高位是1,所以是负整数(补码),所以F5 4A(-2742)起始簇号:相对于上一个Data Run的偏移,所以为:2622888-2742=2620146

    第4个Data Run

    21 01 91 C1
    负整数:起始簇号偏移C1 91的最高位是1,所以是负整数(补码),所以C1 91(-15983)起始簇号:相对于上一个Data Run的偏移,所以为:2620146-15983=2604163

    配套代码修正对该小节配套代码中的HandleAttribute_A0函数和FileContentOffset函数的Data Run处理修改为上述补充的多个Data Run处理:
    计算Data Run的其实簇号代码修改为如下所示:
    if (0 == llClusterOffet){ // 第一个Data Run for (DWORD i = bHi; i > 0; i--) { liDataRunOffset.QuadPart = liDataRunOffset.QuadPart << 8; liDataRunOffset.QuadPart = liDataRunOffset.QuadPart | lpBuffer[wAttributeOffset + wIndxOffset + dwCount + bLo + i]; }}else{ // 第二个及多个Data Run // 判断正负 if (0 != (0x80 & lpBuffer[wAttributeOffset + wIndxOffset + dwCount + bLo + bHi])) { // 负整数 for (DWORD i = bHi; i > 0; i--) { // 补码的原码=反码+1 liDataRunOffset.QuadPart = liDataRunOffset.QuadPart << 8; liDataRunOffset.QuadPart = liDataRunOffset.QuadPart | (BYTE)(~lpBuffer[wAttributeOffset + wIndxOffset + dwCount + bLo + i]); } liDataRunOffset.QuadPart = liDataRunOffset.QuadPart + 1; liDataRunOffset.QuadPart = 0 - liDataRunOffset.QuadPart; } else { // 正整数 for (DWORD i = bHi; i > 0; i--) { liDataRunOffset.QuadPart = liDataRunOffset.QuadPart << 8; liDataRunOffset.QuadPart = liDataRunOffset.QuadPart | lpBuffer[wAttributeOffset + wIndxOffset + dwCount + bLo + i]; } }}// 注意加上上一个Data Run的逻辑簇号(第二个Data Run可能是正整数、也可能是负整数(补码表示), 可以根据最高位是否为1来判断, 若为1, 则是负整数, 否则是正整数)liDataRunOffset.QuadPart = llClusterOffet + liDataRunOffset.QuadPart;llClusterOffet = liDataRunOffset.QuadPart;
    修改后的源码,重新上传在下面的附件中了!!!若对书中内容有疑惑或者发现错误,可以直接戳下面的勘误收集链接哦
    https://www.write-bug.com/article/1966.html
    1 留言 2019-01-21 13:13:32
  • 基于WinPcap实现的UDP发包程序 精华

    背景一天,一位同学打电话给我说,让我帮忙开发一个基于WinPcap工具的UDP发包工具,还特地叮嘱是基于WinPcap,不要原始套接字Raw Socket。而且,时间只有一个白天,它晚上就要,而打电话给我的时候,已经临近中午了。我一听,同学一场,那就举手之劳吧。
    之前,自己就是用WinPcap开发过一些小程序,例如网卡遍历程序、Arp欺骗攻击程序等,所以,对WinPcap还算是熟悉。做这样的UDP发包程序,应该倒也不是那没事。
    现在,为了配合这篇文章的讲解,我特地对这个程序重新开发。把程序的实现原理和过程整理成文档,分享给大家。
    使用VS2013配置WinPcap开发环境程序是使用VS2013开发的,程序中要使用WinPcap工具提供的功能函数的话,就需要将对VS2013进行配置,将WinPcap库导入到程序中,现在,就先介绍WinPcap环境的配置过程:
    1.下载并安装WinPcap运行库http://www.winpcap.org/install/default.htm 。一些捕包软件会捆绑安装WinPcap,MentoHust也会附带WinPcap,这种情况下一般可以跳过此步。
    2.下载WinPcap开发包http://www.winpcap.org/devel.htm ,解压到纯英文路径。
    3.打开VS工程项目,在VS工程项目的 属性 —> VC++目录 中,包含目录 选项添加WpdPack\Include 目录,在 库目录 选项中添加 WpdPack\Lib 目录。
    4.在 属性 —> C/C++ —> 预处理器 中,添加 WPCAP 和 HAVE_REMOTE 这两个宏定义。
    5.在VS工程项目的 属性 —> 链接器 —> 输入 中,添加 wpcap.lib 和 ws2_32.lib 两个库。
    这样,就可以将WinPcap所需的库文件包含到工程项目中了。接下来,我们就开始讲解UDP发包程序的实现过程。
    实现过程1. 获取网卡设备列表首先,我们调用WinPcap函数pcap_findalldevs_ex获取设备列表信息,函数会将计算机上所有网卡设备信息返回到指向pcap_if_t结构体指针里。我们只需要对这个结构体指针进行遍历,就可以获取每个设备的详细信息。
    // 获取网卡设备列表 if (-1 == pcap_findalldevs_ex(PCAP_SRC_IF_STRING, NULL, &alldevs, szErr)) { ShowError("pcap_findalldevs_ex"); return; }
    程序将每个设备的信息显示在界面上,供用户选中使用哪个网卡进行操作。
    2. 设置网卡信息并打开网卡我们使用 pcap_open 函数来设置并打开网卡,函数中第 1 个参数stAdapterInfo. szAdapterName,表示网卡的名称;第 2 个参数 65535,表示设置保留数据包的长度,65535即每个数据包的前65535字节长度的数据被保留在缓冲区中;第 3 个参数 PCAP_OPENFLAG_DATATX_UDP 表示使用UDP协议来处理数据传输;第 4 个参数 1 ,表示以毫秒为单位的读取超时时间。读取超时用来处理,捕获一个数据包后,读操作并不必需要立即返回的情况。但这可能等待一些时间以允许捕获更多的数据包,这样用户层的一次读操作就可从操作系统的内核中读取更多的数据包。第 5 个参数为 NULL;第 6 个参数 errbuf,可以获取返回的出错信息。
    // 打开网卡 m_adhandle = pcap_open(stAdapterInfo.szAdapterName, 65535, PCAP_OPENFLAG_DATATX_UDP, 1, NULL, errbuf); if (NULL == m_adhandle) { ShowError("pcap_open"); return; }
    3. 构造UDP数据包我们根据源MAC地址、目的MAC地址、源IP地址、目的IP地址、源端口、目的端口、数据内容以及数据内容长度,构造UDP数据包。具体构造过程如下:

    首先,构造以太网帧头。
    memcpy((void*)FinalPacket, (void*)DestinationMAC, 6); memcpy((void*)(FinalPacket + 6), (void*)SourceMAC, 6); USHORT TmpType = 8; memcpy((void*)(FinalPacket + 12), (void*)&TmpType, 2);
    然后,构造IP头。
    memcpy((void*)(FinalPacket + 14), (void*)"\x45", 1); memcpy((void*)(FinalPacket + 15), (void*)"\x00", 1); TmpType = htons(TotalLen); memcpy((void*)(FinalPacket + 16), (void*)&TmpType, 2); TmpType = htons(0x1337); memcpy((void*)(FinalPacket + 18), (void*)&TmpType, 2); memcpy((void*)(FinalPacket + 20), (void*)"\x00", 1); memcpy((void*)(FinalPacket + 21), (void*)"\x00", 1); memcpy((void*)(FinalPacket + 22), (void*)"\x80", 1); memcpy((void*)(FinalPacket + 23), (void*)"\x11", 1); memcpy((void*)(FinalPacket + 24), (void*)"\x00\x00", 2); memcpy((void*)(FinalPacket + 26), (void*)&SourceIP, 4); memcpy((void*)(FinalPacket + 30), (void*)&DestIP, 4);
    接着,构造UDP头。
    TmpType = htons(SourcePort); memcpy((void*)(FinalPacket + 34), (void*)&TmpType, 2); TmpType = htons(DestinationPort); memcpy((void*)(FinalPacket + 36), (void*)&TmpType, 2); USHORT UDPTotalLen = htons(UserDataLen + 8); memcpy((void*)(FinalPacket + 38), (void*)&UDPTotalLen, 2); memcpy((void*)(FinalPacket + 42), (void*)UserData, UserDataLen);

    要注意的是,IP校验和以及UDP的校验和计算。这样,我们就可以成功构造UDP的数据包,接下来,就可以对数据包进行发送。
    4. 发送UDP数据包我们使用 pcap_sendpacket 函数发送单个数据包,第 1 个参数表示打开网卡的时候获取的句柄;第 2 个参数就是发送的数据内容;第 3 个参数表示发送数据内容的长度。注意,缓冲数据将直接发送到网络,而不会进行任何加工和处理。这就意味着应用程序需要创建一个正确的协议首部,来使这个数据包更有意义。
    // 发送数据包 if (0 != pcap_sendpacket(m_adhandle, FinalPacket, (UserDataLen + 42))) { char *szErr = pcap_geterr(m_adhandle); ShowError(szErr); return; }
    经过上面 4 步操作,便可以实现使用 WinPcap 发送 UDP 数据包了。要注意IP校验和以及UDP校验和的计算,这两个的值注意不要算错。
    程序测试我们以管理员权限运行程序,并对一个UDP程序发包,观察UDP程序能否接收到我们发包程序发送的UDP数据包。其中,UDP程序使用的是《Socket通信之UDP通信小程序》这篇文章中实现的UDP程序。
    经过测试结果,UDP程序成功接收到UDP数据包。

    总结这个程序是基于WinPcap实现的,所以,可能有很多人之前还没有接触过WinPcap方面的知识,所以,一下子使用和理解起来就比较困难。关键是耐下心来,把程序中调用到的不理解的WinPcap函数,在网上查找说明,对函数理解清晰。
    其中,要注意的是,使用WinPcap工具,需要有管理员权限。
    参考参考自《Windows黑客编程技术详解》一书
    3 留言 2018-11-28 11:22:48 奖励40点积分
  • 网络爬虫技术原理介绍

    什么是爬虫?网络爬虫是一种按照一定的规则,自动的爬取万维网信息的程序或者脚本。网络爬虫按照系统结构和实现技术,大致分为通用爬虫,聚焦爬虫,增量式爬虫,深层网络爬虫。但是实际应用中一般都是几种爬虫技术相结合实现的。
    搜索引擎其实是一种大型的复杂的网络爬虫属于通用爬虫的范畴。专业性质的搜索引擎则是属于聚焦爬虫的范畴。增量式爬虫则是对已下载的网页采取增量式更新和只爬取新产生的或者已经发生变化的网页的爬虫。Web网页按照存在的方式可以分为表层网页和深层网页,表层网页通常是指传统引擎可以索引的页面,以超链接可以到达的静态网页为主构成的Web页面;深层网络是那些大部分内容不能通过静态链接获取的、隐藏在搜索表单后的,只有用户提交一些关键词才能获取的Web页面。爬取深层网络页面的爬虫就属于深层网络爬虫的范畴。
    爬虫的工作流程

    首先选取一部分或者一个精心挑选的中字URL
    将这些URL放入带抓取的URL队列
    从待抓取的URL队列中读取待抓取的URL,解析DNS,并且得到主机的IP,并将URL对应的网页下载下来,存储到已下载的网页数据中。并且将这些URL放进已抓取的URL队列中
    分析已抓取URL队列中URL,从已下载的网页数据中分析出其他的URL,并和已抓取的URL进行去重处理,最后将去重后的URL放入待抓取URL队列,从而进入下一个循环

    爬虫的python实现框架Scrapy爬虫的python实现的库有很多,主要有urllib,httplib/urllib,requests.这里主要讲一下Scrapy框架。
    Scrapy使用python写的Crawler Framwork,简单轻巧,并且非常方便。Scrapy使用的是Twisted异步网络库来处理网络通信,架构清晰,并且包含了各种中间件接口,可以灵活的完成各种需求。


    引擎打开一个网站,找到处理改网站的spider并向该spider请求第一个要爬取的URL
    引擎从Spider中获取的第一个要爬取的URL并通过调度器(Scheduler)以Requests进行调度
    引擎向调度器(Scheduler)请求下一个要爬取的URL
    调度器(Scheduler)返回下一个要爬取的URL给引擎(Scrapy Engine),引擎将URL通过下载器中间件(Downloader Middleware)转发给下载器
    一旦页面下载完毕,下载器生成一个该页面的Response,并将其通过下载器中间件转发给引擎
    引擎从下载器中间件接收到Response并通过spider中间件发送给spider处理
    Spider处理Response并返回爬取到的Item和新的Request给引擎
    引擎将爬取到的item给Item Pipeline,将Request给调度器
    从第2步开始重复直到调度器中没有Request,引擎关闭该网站
    5 留言 2019-04-05 15:45:45 奖励15点积分
  • 大数据 Spark平台5-3、spark-sql

    前文链接:https://write-bug.com/article/2361.html
    Spark-Sql官网:http://spark.apache.org/docs/latest/sql-getting-started.html
    这里对Spark家族进一步介绍,偏入门实践,优化概念会少一些。
    我们在学习Hive时,本质上是把sql语句转换成MR job,节省工作时间,提升效率,其在存储数据时,分为这几个层次:table / partition / bucket / hdfs
    spark sql同样也处理结构化数据,把数据源传来的数据用表格的形式解析并维护起来,与此同时也可和Hive结合使用(数据存储在Hive中)
    在spark streaming中我们通常开发一个模板——Dstream, SparkStreamingContext
    spark SQL同样也有类似的概念——DataFrame, 当成一个table(关系型表格)

    外部数据源(SQLContext):HDFS、网络接口、Mysql等
    Hive数据源(HiveContext):Hive

    两者关系:HiveContext继承自SQLContext,HiveContext只能支持HQL,SQLContext支持语法更多
    DataFrame(由很多RDD组成)让数据处理更为简单,是一个分布式的Table

    与RDD区别:传统RDD以行为单位读数据,DataFrame基于列的内部优化
    与RDD相同点:都具备懒惰机制(基于RDD的抽象)


    Spark SQL处理核心:Catalyst工作流程(本质:把sql、dataframe相结合,以树tree的形式来存储、优化)

    把sql语句和Dataframe输入形成未解析的逻辑计划,加上元数据Catalog的支持,形成逻辑计划,再经过优化,形成物理计划,最后再通过代价模型转化成可执行代码。
    优化点

    基于规则

    一种经验式、启发式优化思路(如sql语句优化)join算子——两张表做join
    外排
    大循环外排:A、B,两张表都很大,O(M*N)——不用游标式外排:归并排序(指针滑动比大小)
    内排:小表放内存,大表做遍历(hive中的mapside join)

    基于代价

    评估每种策略选择的代价,根据代价估算,确定代价最小的方案代价估算模型——调整join的顺序,减少中间shuffle数据的规模

    catalyst工作流程

    parser:针对sql解析
    词法分析:讲输入的sql语句串解析为一个一个的token
    语法分析:再词法分析基础上,将单词序列组合成各类语法短语,组成各个LogicPlan

    SELECT sum(v) FROM( SELECT score.id, 100+80+score.math_score+score.english_score AS vFROM people JOIN scoreWHERE people.id=score.id AND people.age>10 ) a

    analyzer:借助元数据(catalog)解析根据元数据表解析为包含必要列的表,并且相应字段解析为相应的数据类型,相应的计算逻辑解析为对应的函数


    optimizer:基于规则的优化策略经典规则:谓词下推(Predicate Pushdown)、常量累加(Constant Folding)和列值裁剪(Column Pruning)谓词下推:把过滤条件放在join之前执行

    常量累加(180)、列值裁剪(提前过滤掉不用的列值字段):


    物理计划:基于代价的优化策略用物理操作算子产生一个或者多个物理计划。然后用cost模型选择一个物理计划。目前基于cost-based的优化仅仅用 于选择join算法:对已知的很小的relations,sparksql会选择使用spark的提供的点对点的广播功能实现Broadcast join
    执行计划查看方式:

    Spark网页sql
    sql语句后面追加 . queryExecution方法查看

    官方:catalyst优化引擎,执行时间减少75%
    内存管理:Tungsten 内存管理器—— off-heap(堆外内存)

    本质:突破JVM内存管理限制,分配堆外内存(GC、与磁盘做交换dump)
    JVM:GC带来时间开销,可能出现“假死”情况

    实践
    1、基本demo:
    读数据:
    1)从hdfs的原始text中读数据:sqlTest
    //建立学生表Schemeval StudentSchema: StructType = StructType(mutable.ArraySeq( StructField("Sno", StringType, nullable = false), StructField("Sname", StringType, nullable = false), StructField("Ssex", StringType, nullable = false), StructField("Sbirthday", StringType, nullable = true), StructField("SClass", StringType, nullable = true) ))val sparkConf = new SparkConf().setMaster("local[2]").setAppName("sqltest") val sc = new SparkContext(sparkConf)//sqlContext入口 val sqlContext = new org.apache.spark.sql.SQLContext(sc)//RDD导入并解析数据 val StudentData = sc.textFile("hdfs://master:9000/sql_stu.data").map{ lines => val line = lines.split(",") Row(line(0),line(1),line(2),line(3),line(4)) }//把RDD数据和Schema通过sqlContext维护起来形成DataFrame val StudentTable = sqlContext.createDataFrame(StudentData, StudentSchema) StudentTable.registerTempTable("Student")//表名//sql语句使用 sqlContext.sql("SELECT Sname, Ssex, SClass FROM Student").show()2)从hdfs的原始text中读数据(json串):sqlJsonText
    val personInfo = sqlContext.read.json("hdfs://master:9000/person_info.json")//json串中的数据都是维护好的数据,不需要schema personInfo.registerTempTable("personInfo") sqlContext.sql("SELECT id, name, age FROM personInfo").show() println(personInfo.schema)3)从hive中读数据:sqlHiveTest
    启动mysql:]# systemctl start mariadb(hive中元数据)
    终端submit需jar包:lib/mysql-connector-java-5.1.41-bin.jar
    val hiveContext = new HiveContext(sc) hiveContext.table("movie_table").registerTempTable("movie_table")//可对现有表直接进行操作 hiveContext.sql("SELECT movieid, title FROM movie_table").show()2、UDF相关操作:
    1)udf:单条记录处理(map):sqlUdf
    sqlContext.udf.register("strlen", (input: String) => input.length)//函数名字注册,及简单实现功能 val personInfo = sqlContext.read.json("hdfs://master:9000/person_info.json") personInfo.registerTempTable("personInfo") sqlContext.sql("SELECT id, name, strlen(name), age FROM personInfo").show()//字段套用函数2)udaf:聚合场景(groupby)
    例子:每一个打分背后,有多少人参与
    class WordcountUdaf extends UserDefinedAggregateFunction { // 该方法指定具体输入数据的类型 override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true))) //在进行聚合操作的时候所要处理的数据的结果的类型 override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true))) //指定UDAF函数计算后返回的结果类型 override def dataType: DataType = IntegerType // 确保一致性 一般用true override def deterministic: Boolean = true //在Aggregate之前每组数据的初始化结果 override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) =0} // 在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算 // 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Int](0) + 1 } //最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0) } //返回UDAF最后的计算结果 override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)}val hiveContext = new HiveContext(sc) hiveContext.table("rating_table").registerTempTable("rating_table") hiveContext.udf.register("strlen", (input: String) => input.length) hiveContext.udf.register("wordCount", new WordcountUdaf)//注册 hiveContext.sql("select rating, wordCount(rating) as count, strlen(rating) as length" + " from rating_table group by rating").show()//这里两函数可做个对比3、终端
    /usr/local/src/spark-2.0.2-bin-hadoop2.6/bin/spark-sql \--master local[2] \--jars /usr/local/src/spark-2.0.2-bin-hadoop2.6/lib/mysql-connector-java-5.1.41-bin.jar测试与hive数据:
    select rating, count(*) from rating_table_ex group by rating limit 100;4、streaming+sql:sqlAndStreamingWC
    nc -l 9999//单例模式object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance }}if (args.length < 2) { System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } val sparkConf = new SparkConf().setMaster("local[2]").setAppName("sqlAndStreamingWC") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(30)) val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) words.foreachRDD((rdd: RDD[String], time: Time) => { val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._//每条数据都用一个对象操作内存,复用性 val wordsDataFrame = rdd.map(w => Record(w)).toDF() wordsDataFrame.registerTempTable("words") val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") println(s"========= $time =========") wordCountsDataFrame.show() }) ssc.start() ssc.awaitTermination()5、streaming+sql + hbase:streamSqlHbase
    nc -l 9999object HbaseHandler { def insert(row: String, column: String, value: String) { // Hbase配置 val tableName = "sparkstream_kafkahbase_table" // 定义表名 val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "master,slave1,slave2") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") hbaseConf.set("hbase.defaults.for.version.skip", "true") val hTable = new HTable(hbaseConf, tableName) val thePut = new Put(row.getBytes) thePut.add("info".getBytes,column.getBytes,value.getBytes) hTable.setAutoFlush(false, false) // 写入数据缓存 hTable.setWriteBufferSize(3*1024*1024) hTable.put(thePut) // 提交 hTable.flushCommits() }}//从套接字中读取到的信息遍历解析lines.foreachRDD((rdd: RDD[String], time: Time) => { val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ val wordsDataFrame = rdd.map{ x=> (x.split(" ")(0),x.split(" ")(1),x.split(" ")(2)) }.map(w => (w._1, w._2, w._3)).toDF("key", "col", "val") wordsDataFrame.registerTempTable("words") val wordCountsDataFrame = sqlContext.sql("select key, col, val from words") println(s"========= $time =========") wordCountsDataFrame.show() //对dataframe行遍历插入 wordCountsDataFrame.foreach(x => HbaseHandler.insert( x.getAs[String]("key"), x.getAs[String]("col"), x.getAs[String]("val"))) })
    2 留言 2019-04-10 15:19:41 奖励10点积分
  • 基于WinInet的FTP文件下载实现 精华

    背景对于在网络之间的文件传输,我们通常使用FTP传输协议。因为,FTP就是专门为了文件传输而生的,传输效率高,稳定性好。所以,FTP文件传输协议,是我们网络传输中常用的协议。
    为了学习这方面的开发知识,自己专门写了个使用Windows提供的WinInet库实现了FTP的文件传输的基本功能。现在,我就把基于WinInet库实现的FTP文件下载和FTP文件上传分成两个文档分别进行解析。本文介绍的是基于WinInet的FTP文件下载的实现。
    函数介绍介绍FTP下载文件使用到的主要的WinInet库中的API函数。
    1. InternetOpen 介绍
    初始化一个应用程序,以使用 WinINet 函数。
    函数声明
    HINTERNET InternetOpen(In LPCTSTR lpszAgent,In DWORD dwAccessType,In LPCTSTR lpszProxyName,In LPCTSTR lpszProxyBypass,In DWORD dwFlags);
    参数lpszAgent指向一个空结束的字符串,该字符串指定调用WinInet函数的应用程序或实体的名称。使用此名称作为用户代理的HTTP协议。dwAccessType指定访问类型,参数可以是下列值之一:



    Value
    Meaning




    INTERNET_OPEN_TYPE_DIRECT
    使用直接连接网络


    INTERNET_OPEN_TYPE_PRECONFIG
    获取代理或直接从注册表中的配置,使用代理连接网络


    INTERNETOPEN_TYPE_PRECONFIG WITH_NO_AUTOPROXY
    获取代理或直接从注册表中的配置,并防止启动Microsoft JScript或Internet设置(INS)文件的使用


    INTERNET_OPEN_TYPE_PROXY
    通过代理的请求,除非代理旁路列表中提供的名称解析绕过代理,在这种情况下,该功能的使用



    lpszProxyName指针指向一个空结束的字符串,该字符串指定的代理服务器的名称,不要使用空字符串;如果dwAccessType未设置为INTERNET_OPEN_TYPE_PROXY,则此参数应该设置为NULL。
    lpszProxyBypass指向一个空结束的字符串,该字符串指定的可选列表的主机名或IP地址。如果dwAccessType未设置为INTERNET_OPEN_TYPE_PROXY的 ,参数省略则为NULL。
    dwFlags参数可以是下列值的组合:



    VALUE
    MEANING




    INTERNET_FLAG_ASYNC
    使异步请求处理的后裔从这个函数返回的句柄


    INTERNET_FLAG_FROM_CACHE
    不进行网络请求,从缓存返回的所有实体,如果请求的项目不在缓存中,则返回一个合适的错误,如ERROR_FILE_NOT_FOUND


    INTERNET_FLAG_OFFLINE
    不进行网络请求,从缓存返回的所有实体,如果请求的项目不在缓存中,则返回一个合适的错误,如ERROR_FILE_NOT_FOUND



    返回值成功:返回一个有效的句柄,该句柄将由应用程序传递给接下来的WinInet函数。失败:返回NULL。

    2. InternetConnect 介绍
    建立 Internet 的连接。
    函数声明
    HINTERNET WINAPI InternetConnect( HINTERNET hInternet, LPCTSTR lpszServerName, INTERNET_PORT nServerPort, LPCTSTR lpszUserName, LPCTSTR lpszPassword, DWORD dwService, DWORD dwFlags, DWORD dwContext);
    参数说明hInternet:由InternetOpen返回的句柄。lpszServerName:连接的ip或者主机名nServerPort:连接的端口。lpszUserName:用户名,如无置NULL。lpszPassword:密码,如无置NULL。dwService:使用的服务类型,可以使用以下

    INTERNET_SERVICE_FTP = 1:连接到一个 FTP 服务器上INTERNET_SERVICE_GOPHER = 2INTERNET_SERVICE_HTTP = 3:连接到一个 HTTP 服务器上
    dwFlags:文档传输形式及缓存标记。一般置0。dwContext:当使用回叫信号时, 用来识别应用程序的前后关系。返回值成功返回非0。如果返回0。要InternetCloseHandle释放这个句柄。

    3. FtpOpenFile介绍
    启动访问FTP服务器上的远程文件以进行读取或写入。
    函数声明
    HINTERNET FtpOpenFile( _In_ HINTERNET hConnect, _In_ LPCTSTR lpszFileName, _In_ DWORD dwAccess, _In_ DWORD dwFlags, _In_ DWORD_PTR dwContext);
    参数

    hConnect [in]处理FTP会话。
    lpszFileName [in]指向包含要访问的文件名称的以NULL结尾的字符串。
    dwAccess [in]文件访问。 该参数可以是GENERIC_READ或GENERIC_WRITE,但不能同时使用。
    dwFlags [in]转移发生的条件。 应用程序应选择一种传输类型,以及指示文件缓存如何被控制的任何标志。传输类型可以是以下值之一。




    VALUE
    MEANING




    FTP_TRANSFER_TYPE_ASCII
    使用FTP的ASCII(类型A)传输方法传输文件。 控制和格式化信息被转换为本地等价物。


    FTP_TRANSFER_TYPE_BINARY
    使用FTP的图像(类型I)传输方法传输文件。 文件完全按照存在的方式进行传输,没有任何变化。 这是默认的传输方式。


    FTP_TRANSFER_TYPE_UNKNOWN
    默认为FTP_TRANSFER_TYPE_BINARY。


    INTERNET_FLAG_TRANSFER_ASCII
    以ASCII格式传输文件。


    INTERNET_FLAG_TRANSFER_BINARY
    将文件作为二进制文件传输。



    以下值用于控制文件的缓存。 应用程序可以使用这些值中的一个或多个。



    VALUE
    MEANING




    INTERNET_FLAG_HYPERLINK
    在确定是否从网络重新加载项目时,如果没有到期时间并且没有LastModified时间从服务器返回,则强制重新加载。


    INTERNET_FLAG_NEED_FILE
    如果无法缓存文件,则导致创建临时文件。


    INTERNET_FLAG_RELOAD
    强制从源服务器下载所请求的文件,对象或目录列表,而不是从缓存中下载。


    INTERNET_FLAG_RESYNCHRONIZE
    如果资源自上次下载以来已被修改,请重新加载HTTP资源。 所有FTP资源都被重新加载。




    dwContext [in]指向包含将此搜索与任何应用程序数据相关联的应用程序定义值的变量。 这仅在应用程序已经调用InternetSetStatusCallback来设置状态回调函数时才会使用。
    返回值

    如果成功则返回一个句柄,否则返回NULL。 要检索特定的错误消息,请调用GetLastError。

    4. InternetReadFile 介绍
    函数声明
    BOOL InternetReadFile( __in HINTERNET hFile,__out LPVOID lpBuffer,__in DWORD dwNumberOfBytesToRead,__out LPDWORD lpdwNumberOfBytesRead);
    参数

    hFile[in]
    由InternetOpenUrl,FtpOpenFile, 或HttpOpenRequest函数返回的句柄.
    lpBuffer[out]
    缓冲器指针
    dwNumberOfBytesToRead[in]
    欲读数据的字节量。
    lpdwNumberOfBytesRead[out]
    接收读取字节量的变量。该函数在做任何工作或错误检查之前都设置该值为零

    返回值成功:返回TRUE,失败,返回FALSE

    实现原理首先,我们先介绍下FTP的URL格式:
    FTP://账号:密码@主机/子目录或文件例如:ftp://admin:123456@192.168.0.1/mycode/520.zip
    其中,“FTP”就表示使用FTP传输数据;“账号”即登录FTP服务器的用户名;“密码”即登录FTP服务器用户名对应的密码;“主机”表示服务器的IP地址;“子目录或文件”即要进行操作的文件或目录的路径。
    在,WinInet库中,提供了InternetCrackUrl这个函数专门用于URL的分解,在我写的《URL分解之InternetCrackUrl》则篇文章中有详细介绍和使用方法。
    那么,基于WinInet库的FTP文件下载的原理如下所示:

    首先,我们先调用对URL进行分解,从URL中获取后续操作需要的信息。
    然后,使用InternetOpen初始化WinInet库,建立网络会话。
    接着,使用InternetConnect与服务器建立连接,并设置FTP数据传输方式。
    之后,我们就可以调用FtpOpenFile函数,根据文件路径,以GENERIC_READ的方式,打开文件并获取服务器上文件的句柄。
    接着,根据文件句柄,调用FtpGetFileSize获取文件的大小,并根据文件大小在程序申请一块动态内存,以便存储下载的数据。
    然后,就可以调用InternetReadFile从服务器上下载文件数据,并将下载的数据存放在上述申请的动态内存中。
    最后,关闭上述打开的句柄,进行清理工作。

    这样,就可以成功实现FTP文件下载的功能了。与服务器建立FTP连接后,我们使用WinInet库中FTP函数对服务器上文件的操作就如果使用Win32 API函数对本地文件操作一样方便。
    编码实现导入WinInet库文件#include <WinInet.h>#pragma comment(lib, "WinInet.lib")
    FTP文件下载// 数据下载// 输入:下载数据的URL路径// 输出:下载数据内容、下载数据内容长度BOOL FTPDownload(char *pszDownloadUrl, BYTE **ppDownloadData, DWORD *pdwDownloadDataSize){ // INTERNET_SCHEME_HTTPS、INTERNET_SCHEME_HTTP、INTERNET_SCHEME_FTP等 char szScheme[MAX_PATH] = { 0 }; char szHostName[MAX_PATH] = { 0 }; char szUserName[MAX_PATH] = { 0 }; char szPassword[MAX_PATH] = { 0 }; char szUrlPath[MAX_PATH] = { 0 }; char szExtraInfo[MAX_PATH] = { 0 }; ::RtlZeroMemory(szScheme, MAX_PATH); ::RtlZeroMemory(szHostName, MAX_PATH); ::RtlZeroMemory(szUserName, MAX_PATH); ::RtlZeroMemory(szPassword, MAX_PATH); ::RtlZeroMemory(szUrlPath, MAX_PATH); ::RtlZeroMemory(szExtraInfo, MAX_PATH); // 分解URL if (FALSE == Ftp_UrlCrack(pszDownloadUrl, szScheme, szHostName, szUserName, szPassword, szUrlPath, szExtraInfo, MAX_PATH)) { return FALSE; } if (0 < ::lstrlen(szExtraInfo)) { // 注意此处的连接 ::lstrcat(szUrlPath, szExtraInfo); } HINTERNET hInternet = NULL; HINTERNET hConnect = NULL; HINTERNET hFTPFile = NULL; BYTE *pDownloadData = NULL; DWORD dwDownloadDataSize = 0; DWORD dwBufferSize = 4096; BYTE *pBuf = NULL; DWORD dwBytesReturn = 0; DWORD dwOffset = 0; BOOL bRet = FALSE; do { // 建立会话 hInternet = ::InternetOpen("WinInet Ftp Download V1.0", INTERNET_OPEN_TYPE_PRECONFIG, NULL, NULL, 0); if (NULL == hInternet) { Ftp_ShowError("InternetOpen"); break; } // 建立连接 hConnect = ::InternetConnect(hInternet, szHostName, INTERNET_INVALID_PORT_NUMBER, szUserName, szPassword, INTERNET_SERVICE_FTP, INTERNET_FLAG_PASSIVE, 0); if (NULL == hConnect) { Ftp_ShowError("InternetConnect"); break; } // 打开FTP文件, 文件操作和本地操作相似 hFTPFile = ::FtpOpenFile(hConnect, szUrlPath, GENERIC_READ, FTP_TRANSFER_TYPE_BINARY | INTERNET_FLAG_RELOAD, NULL); if (NULL == hFTPFile) { Ftp_ShowError("FtpOpenFile"); break;; } // 获取文件大小 dwDownloadDataSize = ::FtpGetFileSize(hFTPFile, NULL); // 申请动态内存 pDownloadData = new BYTE[dwDownloadDataSize]; if (NULL == pDownloadData) { break; } ::RtlZeroMemory(pDownloadData, dwDownloadDataSize); pBuf = new BYTE[dwBufferSize]; if (NULL == pBuf) { break; } ::RtlZeroMemory(pBuf, dwBufferSize); // 接收数据 do { bRet = ::InternetReadFile(hFTPFile, pBuf, dwBufferSize, &dwBytesReturn); if (FALSE == bRet) { Ftp_ShowError("InternetReadFile"); break; } ::RtlCopyMemory((pDownloadData + dwOffset), pBuf, dwBytesReturn); dwOffset = dwOffset + dwBytesReturn; } while (dwDownloadDataSize > dwOffset); } while (FALSE); // 返回数据 if (FALSE == bRet) { delete[]pDownloadData; pDownloadData = NULL; dwDownloadDataSize = 0; } *ppDownloadData = pDownloadData; *pdwDownloadDataSize = dwDownloadDataSize; // 释放内存并关闭句柄 if (NULL != pBuf) { delete []pBuf; pBuf = NULL; } if (NULL != hFTPFile) { ::InternetCloseHandle(hFTPFile); } if (NULL != hConnect) { ::InternetCloseHandle(hConnect); } if (NULL != hInternet) { ::InternetCloseHandle(hInternet); } return bRet;}
    程序测试在 main 函数中调用上述封装好的函数,进行测试。main 函数为:
    int _tmain(int argc, _TCHAR* argv[]){ BYTE *pDownloadData = NULL; DWORD dwDownloadDataSize = 0; // 下载 if (FALSE == FTPDownload("ftp://admin:123456789@192.168.0.1/Flower520.zip", &pDownloadData, &dwDownloadDataSize)) { printf("FTP Download Error!\n"); } // 将数据保存为文件 Ftp_SaveToFile("myftpdownloadtest.zip", pDownloadData, dwDownloadDataSize); // 释放内存 delete []pDownloadData; pDownloadData = NULL; printf("FTP Download OK.\n"); system("pause"); return 0;}
    测试结果
    运行程序,程序提示下载成功。然后,打开目下查看下载文件,成功下载文件。

    总结在打开Internet会话并和服务器建立连接后,接下来使用WinInet库中的FTP函数对服务器上的文件操作,就如同在自己的计算机上使用Win32 API函数操作一样。都是打开或者创建文件,获取文件句柄,然后根据文件句柄,调用函数对文件进行读写操作,最后,关闭文件句柄。
    所以,大家注意和本地的文件操作进行类比下,就很容易理解了。
    参考参考自《Windows黑客编程技术详解》一书
    2 留言 2018-12-09 14:46:21 奖励18点积分
  • 基于Crypto++计算数据或文件的SHA256值

    背景写了一个基于Crypto++加密库中实现计算文件和数据的SHA256值的一个小程序,Crypto++加密库就不详细介绍了,这个库提供了很多知名的加解密算法,直接调用就好了,使用起来还是比较方便的。
    写这篇文章,就是分享自己的学习心得。自己的密码学部分的知识学得不怎么好,还好有Crypto++开源库可以使用,弥补了对加解密部分的不足。现在,向大家分享使用Crypto++中的SHA256模块实现文件和数据的SHA256值计算方法。
    程序编译设置注意事项首先,先要下载Crypto++库的开源代码,然后,自己编译得到Crypto++的库文件。下载链接还有具体的编译步骤,可以参考分享文章 “使用VS2013编译Crypto++加密库“,里面有详细介绍。
    在导入Crypto++的库文件到自己的工程项目的时候,要对自己的工程项目进行编译设置。主要一点就是:项目工程的属性中的“运行库”设置,要与编译Crypto++的库文件时设置的“运行库”选项要对应一致,否则程序会编译不过的。也就是要检查LIB库工程和本测试工程的:属性 —> C/C++ —> 代码生成 —> 运行库 是否统一。
    如果编译出错,报告XX重复定义等错误,同样,要检查LIB库工程和本测试工程的:属性 —> C/C++ —> 代码生成 —> 运行库 是否统一。
    实现原理计算文件和数据的SHA256值,它们的实现核心代码为:
    FileSource(pszFileName, true, new HashFilter(sha256, new HexEncoder(new StringSink(value))));
    StringSource(pData, dwDataSize, true, new HashFilter(sha256, new HexEncoder(new StringSink(value))));
    这两行代码总共用了4个类 StringSink、HexEncoder、HashFilter、FileSource 和 StringSource。而且,这两行代码的主要区别就是 FileSource 和 StringSource 的区别。FileSource 的第一个参数只需要传入文件路径名称即可,而 StringSource 第一第二个参数分别表示数据首地址指针和数据大小。
    其它参数表示的意义都是相同的,首先用类 StringSink 添加到一个 string 对象缓冲区,接着用类 HexEncoder 把这个缓冲区转换为 16 进制。其中,计算 Hash 值主要用到类 HashFilter。FileSource 类是把要计算 Hash 值的文件 filename 进行一定的转换放入临时缓冲区,然后调用实例化的 HashFilter 对其进行计算相应 Hash 函数的 Hash值,并把 Hash 值返回到缓冲区中。而 StringSource 类是把要计算 Hash 值的数据直接传递给 HashFilter,然后调用实例化的 HashFilter 对其进行计算相应 Hash 函数的Hash值,并把 Hash 值返回到缓冲区中。
    编码实现计算文件的SHA256// 计算文件的 SHA256 值string CalSHA256_ByFile(char *pszFileName){ string value; SHA256 sha256; FileSource(pszFileName, true, new HashFilter(sha256, new HexEncoder(new StringSink(value)))); return value;}
    计算数据的SHA256// 计算数据的 SHA256 值string CalSHA256_ByMem(PBYTE pData, DWORD dwDataSize){ string value; SHA256 sha256; StringSource(pData, dwDataSize, true, new HashFilter(sha256, new HexEncoder(new StringSink(value)))); return value;}
    程序测试我们运行程序,分别使用文件和数据两种方式来计算 520.exe 的 SHA256 值,结果两个值都相同:

    总结这个程序不是很复杂,使用起来比较简单。大家在使用的过程中,注意区分下文件的计算和数据的计算之间的差别即可。
    参考参考自《Windows黑客编程技术详解》一书
    2 留言 2019-01-14 17:16:44 奖励10点积分
  • 【Cocos Creator实战教程(12)】——存储与读取数据 精华

    1. 相关知识点我们在游戏中通常需要存储用户数据,如音乐开关、显示语言等,如果是单机游戏还需要存储玩家存档。 Cocos Creator 中我们使用 cc.sys.localStorage 接口来进行用户数据存储和读取的操作。
    1.1 存储数据cc.sys.localStorage.setItem(key, value)
    上面的方法需要两个参数,用来索引的字符串键值 key,和要保存的字符串数据 value。
    假如我们要保存玩家持有的金钱数,假设键值为 gold:
    cc.sys.localStorage.setItem('gold', 100);
    对于复杂的对象数据,我们可以通过将对象序列化为 JSON 后保存:
    userData = { name: 'Tracer', level: 1, gold: 100};cc.sys.localStorage.setItem('userData', JSON.stringify(userData));
    1.2 读取数据cc.sys.localStorage.getItem(key)
    和 setItem 相对应,getItem 方法只要一个键值参数就可以取出我们之前保存的值了。对于上文中储存的用户数据:
    var userData = JSON.parse(cc.sys.localStorage.getItem('userData'));
    1.3 移除键值对当我们不再需要一个存储条目时,可以通过下面的接口将其移除:
    cc.sys.localStorage.removeItem(key)
    1.4 数据加密对于单机游戏来说,对玩家存档进行加密可以延缓游戏被破解的时间。要加密存储数据,只要在将数据通过 JSON.stringify 转化为字符串后调用你选中的加密算法进行处理,再将加密结果传入 setItem 接口即可。
    您可以搜索并选择一个适用的加密算法和第三方库,比如 encryptjs, 将下载好的库文件放入你的项目,存储时:
    var encrypt=require('encryptjs');var secretkey= 'open_sesame'; // 加密密钥var dataString = JSON.stringify(userData);var encrypted = encrypt.encrypt(dataString,secretkey,256);cc.sys.localStorage.setItem('userData', encrypted);
    读取时:
    var cipherText = cc.sys.localStorage.getItem('userData');var userData=JSON.parse(encrypt.decrypt(cipherText,secretkey,256));
    注意 数据加密不能保证对用户档案的完全掌控,如果您需要确保游戏存档不被破解,请使用服务器进行数据存取。
    2. 步骤2.1 存储数据cc.sys.localStorage.setItem('bestRunScore', this.bestRunScore);cc.sys.localStorage.setItem('bestJumpScore', this.bestJumpScore);
    2.2 读取数据cc.sys.localStorage.getItem("bestRunScore");cc.sys.localStorage.getItem("bestJumpScore");
    就是两个封装的方法,存储的方式是键值对
    2.3 Menu脚本也需要修改一下Menu.js
    ...onLoad: function () { this.record = cc.find("Record").getComponent("Record"); this.runScore.string = "你最远跑了" + this.record.bestRunScore+ "m"; this.jumpScore.string = "你最高跳了" + this.record.bestJumpScore+ "m"; ...},...
    3. 总结在现实生活中,我们经常重要数据传回服务器,而一些不重要数据则存储在本地。而且现在json数据格式很普遍,php等都可以使用json。
    本教程部分素材来源于网络。
    2 留言 2018-12-01 20:11:07 奖励25点积分
  • 大数据Spark平台5-2、spark-streaming

    前文链接:https://write-bug.com/article/2091.html
    Spark Streaming官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html
    在之前我们已经对Storm流式计算框架和Spark-core核心计算引擎进行了介绍,以此为基础更好理解SparkStreaming。
    Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。
    对于大多数业务而言,这两种并没有很大差别:

    Storm的数据是类似水流式的流转数据,毫秒级别
    Spark Streaming的数据是类似离散化后的水状数据,秒级别

    数据来源:

    处理后的数据可以推送到文件系统,数据库和实时仪表板。
    在Spark-core中我们主要处理的其实就是不同的RDD算子(Transformation&action),实行懒惰机制。
    在这里我们的action类算子换为了output类算子,其实开发的时候也是有细微的不同,Spark Streaming针对Dstream开发处理结构。Dstream:每个RDD包含特定的时间间隔。
    这里文字说可能不太明白,这一张图就能直接明了。

    Spark Streaming接收实时输入数据流并将数据分成批处理,然后由Spark引擎处理以批量生成最终结果流。

    在这里与其说是对Dstream做开发不如说是把不同的RDD排列组合,再在不同的入口处声明批次秒数生成新的Dstream罢了。(即DStream中的各种操作是可以映射到内部的RDD上运行的)
    在transformation算子中Spark提供了窗口操作,例如:
    统计最近一个小时的PV量,要求每分钟更新。
    参数:

    window length - The duration of the window (3 in the figure).
    sliding interval - The interval at which the window operation is performed (2 in the figure).


    output内分两种算子:

    执行算子:foreachRDD:主要负责对接外部开发,Hbase,Kafka,Hive
    输出算子:print()、saveAsTextFiles(prefix, [suffix])、saveAsObjectFiles(prefix, [suffix])、saveAsHadoopFiles(prefix, [suffix])

    Streaming架构:


    master:分配任务(画Graph蓝图)
    worker:处理任务(接收、发送)
    client:喂数据

    模式:

    Recevier:被动接收数据-异步两线程
    direct:主动拉数据-同步

    容错—WAL:

    数据流从右侧进入,先在内存中顺序存储(有offset偏移量),再同步存储在磁盘文件系统中(如HDFS),executor处理后再把存储的元数据发送给AM,AM得到请求后并且已知数据存储结构就可以通过SSC入口处理数据,处理数据之前根据元数据把内存中的数据先同步过来,处理数据时可能有数据落地需求;与此同时,元数据结构和数据checkpoint形式存储在文件系统中比便数据恢复。
    重启恢复:

    实践:
    1.word count:
    无状态:不记录上一批次数据
    //定义入口,初始化配置val sparkConf = new SparkConf().setMaster("local[2]").setAppName("wordCount")val ssc = new StreamingContext(sparkConf, Seconds(5))//和之前的Spark-core入口不同,并且参数多了个批次时间设置ssc.checkpoint("hdfs://master:9000/hdfs_checkpoint")//可设置checkpoint点以便恢复数据在context被初始化后,你还需要做如下几点:

    通过input DStream来定义输入源
    通过DStream的转换操作和输出操作来定义流数据处理规则
    开始接受数据并处理:ssc.start()
    等待流处理终止(或者出现异常退出):ssc.awaitTermination()
    手动终止流处理:ssc.stop()

    //这里从一个TCP数据源接收流式数据,在这里我们需要指定主机和端口。还指定了存储等级:内存、磁盘、序列val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)//处理数据逻辑后想要真正开始启动任务调用的方法 ssc.start() ssc.awaitTermination()我们用netcat模拟服务器发送数据:
    //监听模式向9999端口发送数据nc -l 9999有状态的:记录上批次数据并累加
    //改变原来的reducebykey为updateStateByKey(updateFunction _)//updateFunction为自己开发的函数,即把前面批次数据和后面批次数据相加def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {//Scala Option(选项)类型用来表示一个值是可选的(有值或无值)。//Option[T] 是一个类型为 T 的可选值的容器: 如果值存在, Option[T] 就是一个 Some[T] 即成功返回T类型,如果不存在, Option[T] 就是对象 None 。 val current = currentValues.sum val pre = preValues.getOrElse(0)// getOrElse() 方法来获取元组中存在的元素或者使用其默认的值 Some(current + pre)//Scala数据类型 }2.时间窗口
    这里只需要把我们的reducebykey算子改为:
    reduceByKeyAndWindow((v1: Int, v2:Int) => v1 + v2, Seconds(30), Seconds(10))每10秒更新一次数据,更新最近30秒钟的结果,后面10秒参数要和前面设置的批次时间相同;如果批次时间小于10秒,则更新数据时间和批次数据无关,如果大于10秒,则无论时间窗口怎样更新数据,都不会显示。
    3.Kafka+Streaming -wordcount
    conf 加:set(“spark.cores.max”, “8”)
    Recevier模式:
    如果改为local【1】个线程,将不会正常工作出结果。
    在有状态的基础上添加:
    //ReceiverInputDStream类型val zkQuorum = "master:2181,slave1:2181,slave2:2181"val groupId = "group_1"val topicAndLine: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, Map("topic_1013" -> 1), StorageLevel.MEMORY_AND_DISK_SER) val lines: DStream[String] = topicAndLine.map{ x => x._2 }KafkaUtils.createDstream
    构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
    使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上

    创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量
    对于不同的group和topic可以使用多个receivers创建不同的DStream
    如果启用了WAL,需要设置存储级别,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)

    Direct模式:
    val brokers = "192.168.88.101:9092"; val topics = "topic_1013"; val topicSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)KafkaUtils.createDirectStream
    区别Receiver接收数据,这种方式定期地从kafka的topic+partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,使用的是kafka的简单消费者api
    优点:

    简化并行,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。
    高效,这种方式并不需要WAL,WAL模式需要对数据复制两次,第一次是被kafka复制,另一次是写到wal中
    恰好一次语义(Exactly-once-semantics),传统的读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,存在数据丢失的可能性是zookeeper中和ssc的偏移量不一致。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具

    数据挤压问题:
    数据挤压:下游处理速度慢(并发不够、处理速度慢)
    kafka -> streaming

    数据分布,调节offset——紧急
    并发调大,需要kafka配合(增加分区数),提高线程数量
    控制批次的规模—— max.poll.records
    控制数据处理时间(timeout)—— max.poll.interval.ms

    4.Kafka+Streaming+Kafka
    上面对接数据后,这里后面要对接外部服务,用到了前面说的执行算子foreachRdd:
    val array = ArrayBuffer[String]() lines.foreachRDD(rdd => {//遍历每批次数据 val count = rdd.count().toInt rdd.take(count + 1).take(count).foreach(x => {//遍历每条数据 array += x + "--read" }) ProducerSender(array) array.clear() })自己为producer发送数据:
    def ProducerSender(args: ArrayBuffer[String]): Unit = { if (args != null) { val brokers = "192.168.88.101:9092" // Zookeeper connection properties val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) val topic = "topic_1013"//可输出到不同的topic,相同会有好玩现象 // Send some messages for (arg <- args) { println("i have send message: " + arg) val message = new ProducerRecord[String, String](topic, null, arg) producer.send(message) } Thread.sleep(500) producer.close() }5.Kafka+Streaming+HbaseHBase 配置:
    object HbaseHandler { def insert(row: String, column: String, value: String) { // Hbase配置 val tableName = "sparkstream_kafkahbase_table" // 定义表名 val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "master,slave1,slave2") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") hbaseConf.set("hbase.defaults.for.version.skip", "true") val hTable = new HTable(hbaseConf, tableName) val thePut = new Put(row.getBytes) thePut.add("info".getBytes,column.getBytes,value.getBytes) hTable.setAutoFlush(false, false) // 写入数据缓存 hTable.setWriteBufferSize(3*1024*1024) hTable.put(thePut) // 提交 hTable.flushCommits() }}val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val line = lines.flatMap(_.split("\n")) val words = line.map(_.split("\\|")) words.foreachRDD(rdd => {//遍历批次数据 rdd.foreachPartition(partitionOfRecords => {//遍历Kafka分区数据 partitionOfRecords.foreach(pair => {//遍历每条记录 val key = pair(0) val col = pair(1) val value = pair(2) println(key + "_" + col + " : " + value) HbaseHandler.insert(key, col, value) }) }) })运行模式:

    idea中:注意pom中指定好版本文件
    Linux终端中:/bin/spark-submit —master local[2] (代码中也可指定), —class classname jar-path IP portLinux文件重定向:bash run.sh 1>1.log 2>2.log 便于查看数据
    Linux终端中Standard: —master spark: //master: 7077
    Linux终端中yarn模式: —master yarn-cluster

    后两种可指定参数:
    --num-executors 2 \--executor-memory 1g \--executor-cores 2 \--driver-memory 1g \
    2 留言 2019-04-08 17:49:12 奖励11点积分
  • 大数据日志系统10、Kafka

    前文链接:https://write-bug.com/article/2127.html
    Kafka我们前面刚学习了flume,他是一个海量日志收集系统,在生态系统中的定位为前端采集系统,通常我们会见到这样的经典组合Flume+Kafka+Storm+Hbase/HDFS,而Kafka在这里就起到一个承上启下的管道作用,定位为:分布式的消息队列系统。

    分布式消息队列系统,基于发布/订阅的系统

    分布式:支持水平拓展和高吞吐能力,像Flume一样可以拓展为很多机器共同协作发布/订阅:一种对外服务的功能,就像微信订阅号一样,发布者在一个话题下发布消息,读者也到这个话题下读取消息并且在读取过程中我们只是在订阅的话题下看到更新才去读取,但这个话题下的消息有可能是昨天发布的,所以我们并不是同步读取的,而是通过异步的方式,而这个话题就像一个缓冲器,暂时存储了发布者发布的信息,之后我们订阅者再从这里异步读取信息
    消息持久化:Kafka的一个特殊点,数据直接向磁盘中写,那我们可能会说其他的框架为了增加速度都是先把数据往内存中写再往磁盘中写,Kafka会不会很慢呢?这里Kafka做了大量优化,我们操作系统有两个特点:预读和后写

    预读:预先读取下一行后写:把多次写压缩成一次写内存快的特点:随机访问读写,学过数据结构的人都知道,最开始的数组可以通过顺序存储的方式,下标访问会更快速读写数据,虽然会占用一部分空间,但是比起大数据来说都是毛毛雨了,而Kafka对持久化优化的方式就是顺序存储,对磁盘的顺序访问,可以做大O(1)的方式进行数据处理(比内存访问还要快)
    实时性:生产者生产的message立即被消费者可见

    message:Kafka的数据单位,可对比HDFS的block,Flume中的event

    在之前我们要做一个数据统计和报表,我们是通过离线的方式用Hbase和Hive去做一个数据统计分析,但他的实时性太差了,维护离线数据的成本也很高,基于这一点,我们现有的消息队列系统都把目光放在了实时上,没有注重消息的持久化,那么我们需要一个既支持在线又支持离线的系统就是Kafka。
    组件
    Broker:每一台Kafka机器节点是一个Broker
    Producer:日志(即message)消息生产者,主要写数据
    Consumer:日志消息消费者,主要读数据



    Topic:是虚拟概念(前面说的话题),不同的consumer去指定的topic去读数据,不同producer可以往不同的topic去写
    Partition:是实际概念,文件夹形式存在,是在Topic的基础上做了进一步分层,有些类似MR中的Partition,
    Partition功能:负载均衡,需要保证消息的顺序性

    顺序性的保证:订阅消息是从头往后读取的,写消息是尾部追加,所以整体消息是顺序的如果有多个partiton存在,可能会出现顺序不一致的情况,原因:每个Partition相互独立
    Partition有两部分组成:

    (1)index log:(存储索引信息,快速定位segment文件)(2)message log:(真实数据的所在)

    流程:
    Producer发布一个消息,并绑定一个Topic,Topic的物理概念是Partition文件夹来存储,而partition存在了不同的Broker中(hash%),从而Topic也就分布在了不同的Broker中(负载均衡)。
    当Broker的管道建好后,Producer可以进行写,Consumer进行读操作,这个读和写是同时进行的,当然这其中有一定延迟,而对于Broker来说,Producer和Consumer都是客户端,也就是说我读和写都是不需要顾及其他人的,只负责自己的功能,也就是Kafka对两端做异步解耦。
    在HDFS多副本的方式来完成数据高可用,如果设置一个Topic,假设这个Topic有5个Partition,3个replication。
    Kafka分配replication的算法:
    假设:
    将第i个Partition分配到(i % N)个Broker上
    将第i个Partition的第j个replication分配到( (i+j) % N)个Broker上
    虽然Partition里面有多个replication
    如果里面有M个replication,其中有一个是Leader,其他M-1个follower

    在Kafka集群中,和Strom/Hbase相似,也依赖了一个Zookeeper组件,在上图可以看出,zookeeper中存储了一些Broker和Consumer的信息,而Producer发布消息是要指定topic(Broker)的,不需要zookeeper去做一个分发,对于Consumer来说,需要用zookeeper去做一个负载均衡,比如说我有多个consumer,那我们把它用组的方式维护起来,这个Group组对于kafka来说就是一个逻辑概念,这时上游来个一个数据,如果我把这个数据每台真实consumer都存储一次这个数据可能对存储性能有很大的要求,那么这里我们就可以像Hive分库一样,每一个子节点存储一部分数据,可以做并行计算。
    同样的(id%hash)去分配数据做负载均衡。producer和broker之间是一个push模式,然后Consumer和producer之间是一个pull模式,push模式就是推,pull模式就是拉,那不管你是push模式还是pull模式都是由你的producer和Consumer进行主动的,就像producer有一个消息,我主动的把消息推给你的broker,然后Consumer我要消费了,那我主动的去在你的broker去取数据
    topic角度来看,上面说过topic相当于话题,也就是消息的类别,在物理上topic的数据分开在不同的broker上存储,而用户只需要指定消息的topic就可以了,而不需要关心消息存储到那台机器中,topic是一个逻辑概念,消息传递有一个基于时间顺序的先后顺序,那么就算分配到不同子节点中,也是局部有序(partition)的,这里有一个offset偏移量,记录着Consumer读取数据时的地址后移动的取值范围,类似文章的第几行,这种topic划分多个partition的方式可以有效地提高Kafka的吞吐率,而对于topic来说,无论消息有没有被消费,消息都会一直持久化,存储的时间默认是7天,以便后续下游出错可以回溯,下游的消费进度是不同的,而读取到哪里这个标记就是偏移量offset,考虑到Kafka性能的原因,把主导权给了consumer去维护,也就是自己保存自己的读取位置。
    Message:前面说过其为数据传递单位,producer生产消息追加在文件末尾,一旦追加后就不能改变,其消息格式为:
    message length:4bytes(1+4+n)“magic”value: 1bytecrc: 4bytespayload: 真实数据,nbytes
    Producer:在Kafka种有现成的demo可供使用,当然也可以直接用flume对接Kafka,这样flume就是一个producer。
    producer分为两种模式:
    producer.type=sync 同步 实时发送
    producer.type=async异步 数据来了后现在Kafka中暂存一部分,当达到一定条件,数据在被应用:

    时间
    数据量

    Consumer:同样在前面我们提到了组概念,即消费组(Consumer Group),这种更抽象的消费方式把消息分片成了几个部分存在各自的Consumer中,在搭建时,可设置多个消费组,而不管Kafka内部的几个partition对应了几个Group中的Consumer,对于Group全局来说我得到数据都是完整的。
    随着Producer的写入,Kafka内存文件不断增大,那么在这样的大文件中我怎么快速查找到我想要的数据呢,kafka内部是实际上是这样做的,它是用多个segment去把这个大的segment给它拆分,这样的话比如说我先写1号segment,写完之后再写2号segment,2号写完了写3号,所以你会发现一个特点,这个1号segment,2号segment,3号segment当然它还在不断的产生数据,而且产生的时候这个segment这个文件大小基本上也都是一致的,那随着这个数据的不断积累,那你这个segment也是不断的增多是吧?但是你越旧的数据或者越早的数据,是存到了前面数字越小的segment里面去,越新的数据就存在了这个数字的越大的segment这里面去。当我查历史数据时,我知道offset,但不知道具体在哪个segment上,这里基于Kafka的顺序存储,我们可以用二分法快速定位segment。

    传输效率:zero-copy。
    0拷贝:减少Kernel和User模式上下文的切换。直接把disk上的data传输给socket,而不是通过应用程序来传输。
    Kafka的消息是无状态的,消费者必须自己维护已消费的状态信息(offset);减轻Kafka的实现难度。
    Kafka内部有一个时间策略:SLA——消息保留策略(消息超过一定时间后,会自动删除)
    交付保证:

    at least once:至少一次(会有重复、但不丢失)at most once:最多发送一次(不重复、但可能丢失)exactly once:只有一次(最理想),目前不支持,只能靠客户端维护
    Kafka集群里面,topic内部由多个partition(包含多个replication),达到高可用的目的:

    日志副本:保证可靠性角色:主、从ISR:是一个集合,只有在集合中的follower,才有机会被选为leader如何让leader知道follower是否成功接收数据(心跳,ack)如果心跳正常,代表节点活着
    怎么算“活着”

    心跳如果follower能够紧随leader的更新,不至于被落的太远如果一旦挂掉,从ISR集合把该节点删除掉
    实践(需要把zookeeper提前启动好):
    一、单机版
    1、启动进程:
    ]# ./bin/kafka-server-start.sh config/server.properties2、查看topic列表:
    ]# ./bin/kafka-topics.sh --list --zookeeper localhost:21813、创建topic:
    ]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test4、查看topic描述:
    ]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic newyear_test5、producer发送数据:
    ]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic newyear_test6、consumer接收数据:
    ]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic newyear_test --from-beginning7、删除topic:
    ]# ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic newyear_test二、集群版
    在slave1和slave2上的broker.id一定设置不同,分别在slave1和slave2上开启进程:
    ./bin/kafka-server-start.sh config/server.properties创建topic:
    ]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test下面命令会创建失败:原因副本数超出实际机器个数
    ]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 4 --partitions 5 --topic newyear_many_test_2三、自主写producer、consumer
    1、实现一个consumer group
    首先在不同的终端分别开启consumer,保证groupid一致
    ]# python consumer_kafka.py]# python consumer_kafka.py执行一次producer:
    ]# python producer_kafka.py2、指定partition发送数据
    ]# python producer_kafka_2.py3、指定partition读出数据
    ]# python consumer_kafka_2.py四、Flume+Kafka
    1、启动flume:
    ]# ./bin/flume-ng agent --conf conf --conf-file ./conf/flume_kafka_2.conf --name a1 -Dflume.root.logger=INFO,console发送:
    ]# for i in `seq 1 100`; do echo '====> '$i >> 1.log ; done ]# curl -X POST -d '[{"headers":{"flume":"flume is very easy!"}, "body":"111"}]' http://master:52020]# cat conf/flume_kafka_2.conf # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source#a1.sources.r1.type = exec#a1.sources.r1.command = tail -f /root/9_codes/flume_test/1.loga1.sources.r1.type = httpa1.sources.r1.host = mastera1.sources.r1.port = 52020a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Buildera1.sources.r1.interceptors.i1.headerName = keya1.sources.r1.interceptors.i1.preserveExisting = false# Describe the sinka1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.brokerList = master:9092#a1.sinks.k1.topic = badou_flume_kafka_test#a1.sinks.k1.topic = badou_storm_kafka_testa1.sinks.k1.topic = newyear_many_test# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
    2 留言 2019-03-04 15:05:37 奖励10点积分
  • 大数据日志系统9、flume

    前文链接:https://write-bug.com/article/2123.html
    Flume日志收集系统
    Apache Flume是一个分布式、可信任的弹性系统,用于高效收集、汇聚和移动 大规模日志信息从多种不同的数据源到一个集中的数据存储中心(HDFS、 HBase)
    Flume它是一个消息采集系统,什么是消息采集呢?
    消息就是说你的数据源也就是你的消息源,在这个用户他会通过一系列行为他会留下大量的行为数据或者是行为消息,那这些消息都是更接近于更原始的最原生没有任何过滤的一些有价值的信息提取,相当于是整个的一个记录序列里面,它既有价值信息又有参杂的一些过渡修饰的一些结构,需要被过滤的一些消息,那这个时候你需要把这些大量的消息从数据源开始进行一个收集,因为用户在去留下它们的日志行为的时候,其实这些行为都会被留在了或者被散落在了各个不同的服务器的一个角落,那相当于这些服务器也就是散落在不同机房不同地域的一些各个的数据节点上或者一个服务器节点上,那这个时候它这个数据是非常一个散落的状态,这个时候就需要一个服务,把这些散落的这些原始日志进行一个统一的一个收集,然后供后续的整个的流程比如什么数据过滤,数据入库,数据挖掘等等这些后续我们有待于去进一步操作的事情,所以第一步首先想办法怎么把这个最原始的数据先对接过来,那这个时候就需要用类似于Flume这样的一个消息采集系统到目前为止我们在之前的学习过程中已经完成了很多的一些重要的一些组件,那不同的组件其实有各自的特点然后每一个组件都适合不同的场景,那其实我们在之前不管学hive也好学hbase也好或者是学hdfs,那你会发现这么多个组件其实在整个的这个架构里面它们都处于一个完整项目的一个中下游,中下游就是相当于说是消费者这么一个状态,那起码你的有生产者,生产者你不可能消息一开始就生产到了HDFS上对吧?
    然后这个时候你需要通过一个中间介然后把最原始的消息采集过来然后再去传到后面不管是HDFS上还是hbase这些等等的存储上面去,这个时候相当于我们之前学的不管是HDFS,hive,hbase都是属于下游的一个角色,而且我们还学了一些集成框架,这框架有mapreduce,storm以及spark都是来解决数据计算的一些问题,然后hive,hbase主要是解决一个数据存储和结构化的问题,所以这个时候我们既然已经学了这么多处于一个从消费者的状态这么一个角色一些个组件的话,我们就要想我们这些数据的源头是哪里对不对?那很多时候我们做项目的时候这数据已经给你好现成的了,你就直接去做处理就可以了,你不需要关心这个数据源在哪里,但是你从一个完整的一个项目的宏观角度去观察,你必须要知道这个数据它的来源是在哪里是不是?所以为了保证整个的项目的完整性,保证你对整个的一个数据流的一个打通一个鼻环的一个认识,就是你的数据采集这一块也是有必要掌握的对吧?所以数据采集这一块我们通常就用这个flume方式进行一个消息采集
    数据源:

    server Log(tail、grep查看):webserver
    远程调用:http接口(url)、RPC
    网络:netcat:IP:port(生产消费)
    文件系统:目录树数据变化
    终端:Console
    文本:Text
    数据库:Exec

    你有数据采集之后那你接下来就假设把这个原始信息拿到了,那么就需要把这个数据做一个缓存,先把这个消息进行存储起来,然后存储起来之后因为你这个消息会存在着大量的无效的一些信息,你需要做一些有效字段或者有效结构化一些提取,这时候就涉及到了数据过滤环节。
    跟着这样的一个思路,从数据源开始通过一个服务把数据采集下来,采集的数据需要通过某一个存储或者是某一个缓存把它暂时的存储起来,起码这个数据先落地,落地到具体哪个位置的话这个先不用考虑,起码先把这个数据拿下来,那下来这个数据因为比较原始所以你需要对这个数据进行过滤,因为这数据存在大量的无效的数据。
    然后接下来过滤完之后你需要做一些个转换工作,比如说你这个数据是从一个非结构化的数据变成一个结构化的数据是吧?你怎么把这个数据从你的文件系统里面怎么样转换到一个像数据表格那样,字段然后记录行列之间非常清晰分布这么一个状态,这样有便于后续的一些分析,那转换后的数据就很明显了,需要把它进行一些存储,比如说把它存到HDFS上或者存在你的hbase上等,好了你把这个数据存储完之后那接下来需要做一些检索工作,就需要用于一些检索用,那检索相信你这个数据存储的话你是肯定是要后面来用的对吧?那怎么让下游用的更方便或者是更快捷?这里面就肯定涉及到了一些键索引,那你比如说像之前搞过mysql的人,然后为了让你的数据检索的更快那它自身会支持一些个索引。
    那在整个的大数据里面比如说这个HDFS,比如存储在hbase上那这个时候估计大家就会有疑问了,这个索引怎么建呢?那这个时候其实跟我们后面要学的数据挖掘部分怎么去做一个数据分析还是有一定的关联的。Hbase这块它也是支持一定的索引的。
    那不管怎么说就目的就是能够快速的检索你的数据,那检索到数据之后就开始做一些个数据分析,然后分析后的数据就是把你有用的信息怎么去挖掘出来,然后把挖掘这最后的数据进行一些服务,大概是这么几个环节(如下图所示)

    那整个的一个数据的一个走向那从上游慢慢从水游一样游到了下游,那你会发现数据采集这一块是非常非常贴近于最源头了是吧?那所以今天我们就要了解数据采集这块,那有很多人不太理解为什么你今天非要讲flume,为什么不讲kafka,那有基础的同学那在上图1中里面应该是处于哪个模块?Kafka是一个消息缓冲器对吧?那么相当于kafka是相当于上图1中的缓存器,那么我们学完数据采集再去学kafka,像这样的话你会发现会整个的思路来说会比较更顺畅一些是吧?我们先学上游再去学下游这么一个思路,那么这个过滤是需要做什么呢?这个过滤结合我们之前的内容来说,我们这个过滤如果要做你想怎么做?你可以用mapreduce用storm等去实现,我们之后会有一个案例,怎么把flume和kafka和Storm关联起来对不对?相当于整个链路就打通了。那转换这一块就好说了,不就是转成hive或者是hbase。
    从最开始的图我们就可以看出Flume在其中是一个承上启下的角色,左边流入,右边流出,而且你后面的分析,数据挖掘都是在你的统一的这套存储系统上去做的,主要是想办法怎么去让日志消息收集过来,就是这么一个作用。除了上面多种方式高效接入接出数据的特点,Flume还支持多路径流量,多管道接入流量,多管道接出流量,上下文路由等特点还可以被水平扩展。那么这几个特点是什么意思呢?
    比如说多个log服务器和一个flume和两个存储系统,好了那我这个日志就可以通过flume集群,其实这个flume只画了一个模块,其实这个flume你可以搭建一个集群的方式通过多个不同的机器来维护整套的消息采集这么一个系统,因为这个消息量还是很大的,你只要通过一台机器的话,那通常是搞不定,所以flume还是通过一系列集群来去并发的收集信息。
    然后flume的这个数据也可以进行一个多路输出,意思就是你可以把一个消息可以选择性的去存储到1还是2中,比如说你这个存储1,你这个存储1后面走的挖掘策略就不一样了。这存储2就是另外一套挖掘策略方案(如下图所示)

    而且这两个日志可能不是完全一样的,比如说这个日志1是来收集展现,这个日志2是来收集点击的,那这个策略1可能就是对展现日志进行一个处理,这个策略2对点击日志进行一个处理,那所以这个时候我们希望展现日志可以通过这么一个管道的方式能够有效的流入存储1里面去,我就不想这log1里面的日志不想流到存储2里面去,那么相当于是flume一旦发现你这个源是来自日志1,那我就可以自动把你这个数据直接放到存储1里面去,并不是说把你的数据能够复制,把同样的一个日志只要你这个下游都是对接到了flume上,那下游所有的节点都会收到同样的一份消息,那flume可以这样有区别的进行对待处理,这是一个它可以指定有效路径的方式,这个方式叫做复用机制。
    还有一个就是复制机制,就是说我不管你这个flume前面这个数据源是什么,只要是你来了一个数据源,只要是我这个存储是属于flume下游,那我所有的存储器我都会接收到同样的一份数据,比如说你这个log1的一条数据进来了,那比如说后面有两个存储,那相当于把你一份的数据我复制成两份,每一份节点我都发一份,这是一个复制机制。
    然后多管道接入流量这块也可以体现出这个问题,就相当于这两个日志有两个管道,那这两个log日志是来自不同的一个日志源级别的,这时候flume就可以通过不同的管道去对一些不同的日志源,然后多管道接出,剩下一个上下文路由,路由刚大概说过了,然后水平扩展这一块因为这个flume通常用的时候也是通过一个集群的方式去用,这个flume你可以进行一些个扩展,比如说你一些节点就是在数据采集过程中不够用了,你可以往上面加一些个节点或者资源然后共同的支撑共同的并发,相当于是可扩展性比较强把。

    那我们先从外部的整个框架入手,那最左边是一个消息的一个发生器(Data Generators),什么是消息发生器呢?就是一个日志服务器,比如说就是你们公司里面那个用户接收请求的它的一些请求信息,然后一些收集的这些服务器,就是一些webserver,那相当于日志发生器就是它已经开始陆续的产生消息了,那后面有一个橙色的一个大框对吧?,这里面就是一个整个的flume,然后flume就是从这个发生器(Data Generators)日志来进行采集,采集之后又得到了后面的一个HDFS或者是hbase存储里面去。
    好了这个时候把这个整个的flume这个黑盒的面纱解开你就可以看到这个flume,里面分成了一个Agent和一个Data Collector,Agent的意思就是一个代理模块,它是用来对消息进行接收和汇集。比如有两个log server1和log server2,那么这个Agent通常是部署到跟你同一个server同一台机器上,你这个og server1是用来不断的产生的日志消息,然后你一旦产生这个消息由这个Agent1这个消息来从你这个server上直接发送出去,那把消息发送给谁呢?就发送给一个叫collector(如下图所示)

    所以你从这图8里面就可以看出我虽然这个flume里面包含了Agent和这个collector,其实通常来说Agent和这个collector是分班部署到不同的节点上的,就是结耦。
    通常来说就是你的Agent会很多,因为你这每一个log server都有一个Agent,那你这个server通常会很多对不对?所以Agent也会很多。那这个时候你后面不是像上图8画的一样,一个Agent和一个collector一一对应并不是这样的,通常来说就是一个Agent可能会对应多个collector,就相当于是你前面有多个Agent的消息统一的被你的collector进行一个收集,一般来说server和Agent是一比一的,好了那Agent把消息发给collector,因为这两个属于不同的机器,这个时候collector会去把真正收集到的信息再去做一个存储(Storage),因为这个存储就HDFS或者是hbase,所以这一块就不需要大家开发了,那你就需要把collector怎么能够通过一个配置的形式把前面消息能够直接发送到指定的相应配置的目标路径上去就可以了,所以通过这个地方大家先了解一下你的Agent和你的collector的定位是怎样的(如下图所示)

    Agent就相当于冲到最前线的,这个collector就相当于是后方基地,然后不断去接收前线的一些消息,然后它在把这个消息怎么再往存储上再去做处理,collector也是可以多个的。
    接下来再看一下我们在去讲storm的时候,storm也类似于这么一个流程是不是?从头往后一个数据流进行一个传输是不是?然后再storm里面它的数据流也是有一定的单位的形式做传输对吧?那这个storm的单位是什么呢?tuple对吧?,那在hdfs上数据的单位是一个block对吧?在flume里面数据单位是Event,是一个事件。假如说在整个的flume里面它内部流转了这些消息都是一个事件,所以flume是用这个Event对象来做一个消息传递的格式(如下图)

    它属于内部数据传输的一个最基本单元,那你把这个事件已经打开,打开它有两个部分组成,那第一个部分就是一个Header,第二个部分就是一个Byte Payload,就是你的头和身体对吧?通常这个数据这个头部你可以有也可以没有这个可以选择的,不一定说这个header就是一定要存在的好吧?那如果说这个header要存在的时候可以理解为一个key,body你把它想象成一个value,如果你把数据有一个key有一个value,大家很容易想到的是在mapreduce里面有一个partition对吧?partition就是用来做分发消息的,也就是说这个Event有两个部分,Header和Byte Payload,这个Header是可有可无,如果是它没有这个Header只有byte Payload的时候,那么byte Payload其实就是存的是数据,那这个时候数据就开始往后流向进行传输是不是?这是一个最直接的流程,但是有的时候你需要对这个数据做一些个路由,最后一些个分发,就是说有的消息我想分发到A节点上但是我不想分发到B节点上,有的消息我想分发到B节点但是我不想分发到A节点上,那对于这种有特殊需求的情况,这个时候你就需要用到header,你必须要分配给它一个key,然后这个时候它做分发key的时候就根据你这个header里面的信息去做一个数据的一个路由,有点类似于分桶,所以相当于把这块跟我们之前学的partition结合起来可能理解起来更容易一些。
    header是key/value形式的,这个其实跟我们说的key和value不属于同一个层次,就是这个header如果你要是有这个信息,这个信息就大概长(k:v,k1: v1)这样的样子,然后你去做一个partition的时候你就根据这个key和value去做一个分发这么一个情况,所以这个时候大家就记住一点我的header就是为了做分发用的,Byte Payload就是存实际的数据内容用的。
    好了这个时候就讲了一个比较重点的东西,就是一个代理(如下图所示)

    这个Agent刚刚我们讲过了,这个flume可以拥有多个Agent,当然也可以拥有一个,然后每一个Agent就是一个进程,这个进程就相当于是在你的服务器上一直运行着然后一直监控着你的这个消息,一直监控着你这个日志的产生,一旦你有日志发生变化了,那这个进程就会把你的消息进行一个数据的收集然后往下游不断的传输。
    如果说你把这个Agent再进行打开,再进行把你的内部细节暴露出来,Agent就可以暴露出三个部分,这三个部分就是source和Channel和sink这三个,那么这三个模块有什么用呢?source就是真正对接你数据源进行输入,而Channel就是一个管道,sink就是一个输出,就是你的source把消息接收过来,然后消息会存到你的管道里面会做一个缓存,缓存我们之前学过一些对吧?这个缓存存储可以是文件形式的,相当于就是在你本地一个落地到磁盘上的那个文件对不对?还有一个就是在你内存分配出一个区域,就是这个数据在你内存中扭转的,不落地这是通过一个memory的形式,所以你的数据输入是可以存在你的文件里面也可以存到你的memory里面,那存到memory里面会更快一点,但是有一个问题就是一旦你这个agent出现了问题那你这个消息因为你存在memory里面,在消息可能会存在丢失的风险。但是为了保证你的消息的可用性可靠性通常建议把消息直接存在你的文件里面,但是这是存到你的文件里还是存到你的内存里面这是你要通过一个配置去配置的。

    source :输入-》对接各种数据源
    channel:缓存管道(file,memory)
    sink :输出-》对接各种存储

    然后输入就是对接各个数据源,输出就是对接各种存储,所以相当于是每一个组件都是各司其职,然后彼此之间能够协同工作,然后让消息能够有效的在内部进行一个扭转。如果要是在Agent里面我们在对各个组件在做一个更深入的了解,那我们接下来看一下source
    source是一个整个的flume的源,它是最贴近于你的消息源的那么一个模块对吧?。它相当于就是一个数据源的外部采集,然后把它外部源数据接收过来然后变化成flume可以识别的格式。这个格式就是一个事件(Event),然后在从这个flume开始内部进行流转。
    然后Channel就是一个通道,刚才我们一直说缓存,你可以把它理解成缓存就好
    通道:采用被动存储的形式,即通道会缓存该事件直到该事件被sink组件处理
    所以Channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存 起来,直到它们被sinks消费掉,它在source和sink间起着一共桥梁的作用,channel是一个 完整的事务,这一点保证了数据在收发的时候的一致性. 并且它可以和任意数量的source 和sink链接
    可以通过参数设置event的最大个数
    这时候大家会有一个疑问了,你是一个存储器,那如果要是采集的这个消息量非常大,那一旦超过了你这个缓存的限制,那相当于你的内存就爆掉了对不对?这时候会导致一些节点的风险对吧?会不会有一个数据的累计,然后不断的去膨胀这么一个风险。它是可以通过配置去配置的,就是控制流量,就是控制你这个source从外界接收这个数据每一次接收需要接收多少个事件,它是有一个流量控制的,如果你前面流量放的很足那肯定会对这个存储内存会有一定的压力,那一旦有压力你可以减少这样的一个采集量就可以能够进行一个减缓,这是可以通过一个配置event来进行配置。

    Flume通常选择FileChannel,而不使用Memory Channel
    Memory Channel:内存存储事务,吞吐率极高,但存在丢数据风险
    File Channel:本地磁盘的事务实现模式,保证数据不会丢失(WAL实现)

    另外一个就是消息传到存储这块来之后,那它需要sinks来去对它进行一个消费,所以它这个Channel在这个source和sinks之间搭建了一个桥梁作用。那刚才我们说过了这个Channel它既然是一个存储,那你这个数据可以存到你的FileChannel里面,然后memoryChannel都是把数据存到内存,吞吐力高,效率高,但是容易存在丢数据的风险,那么FileChannel就是需要把你的数据落地了是不是?一旦你这个机器挂了,数据也不会丢失。
    然后sinks相当于就是在整个的Agent里面,sinks就是一个消费者,怎么把这个消息消费掉,那消息是在你的Channel里面,sinks会将你的消息或者你的事件从Channel里面进行,然后并且把你的事件开始往外输出,输出到外部的存储上面去

    Sink会将事件从Channel中移除,并将事件放置到外部数据介质上

    例如:通过Flume HDFS Sink将数据放置到HDFS中,或者放置到下一个Flume的Source,等到 下一个Flume处理。 对于缓存在通道中的事件,Source和Sink采用异步处理的方式
    Sink成功取出Event后,将Event从Channel中移除
    Sink必须作用于一个确切的Channel
    不同类型的Sink:

    存储Event到最终目的的终端:HDFS、Hbase 自动消耗:Null Sink 用于Agent之间通信:Avro

    然后一旦这个消息被sink消费掉之后,这个消息就会从这个Channel里面就会移除,它有点类似于队列的形式,那最后你这个数据是以哪种存储的形式落地了呢?是由sink来决定的,也就是你需要通过配置来控制sink你最终数据是怎么样的方式输出,你的数据是可以存在你的HDFS上或者是hbase上,它没有一个默认的一个输出,这个需要你通过一个很简单的配置你可以来控制这数据是怎么样输出来的,另外一个其实在整个的flume集群里面它是可以允许有类似多个flume,然后进行一个彼此之间的一个关联,这个像flume我们刚才打开过,它主要里面是一个Agent是吧?然后你可以把它当中一个玩具一样,然后进行一个彼此之间的拼装,然后你可以把这个集群做的规模很复杂或者一个很简单都可以,所以它在整个的集群或者是消息采集过程中它的这种集群搭建还是很灵活的。
    比如这个Agent如果你在本地搭建的时候,你这个Agent是可以直接存储到你的存储上的,这是可以的,但是有的时候你的Agent和你这个server是部署到了同一台机器上,那你这个机器就是为了来存储日志的,那你在给这个机器再去开放往这个Storage这个机器上去写的这个权限就不太合适,所以就需要把Agent数据再转到一个统一的一个中心,然后这个机器就可以进行一个对外的写服务(如下图所示)图:

    这个是一种形式,另外一个collector它得把这个分散的数据进行收集,所以通常用的时候就是配了一个agent和一个collector,但是从字面上来看这两个感觉差距很大,其实agent和collector你会发现配置的时候基本上是一样的,只不过是数据源不一样,agent的数据源是来自于你的外边真实的外部数据,你collector来的数据是来自于你的agent,就相当于你来自你的flume组件,其实这个agent和collector本质是一样的,只不过是数据源来源不一样,只不过是为了区分他们的角色,如果说遇到了这种内部组件之间的一个对接就相当于这两个flume之间的一个对接的话,那这个数据通过传输的方式的话就需要通过一个avro的方式进行对接(如下图16)

    你collector要对接的话必须要通过这样的方式去对接,你这个agent这个就很丰富了,对接着这种数据源的格式,数据源的类型就很丰富了。
    刚才我们已经说过了它一个agent内部分为最基本的三个组件是source和channcl和sink,那么这三个组件都是缺一不少的,但是它还有两种组件是可以选项,就是说你可以用可以不用,根据你的业务需求,如果你的业务需求确实是涉及到了这方面的一个要求那你这个组件就应该用,那么source和channcl和sink这三个是必须要有的,还是有两个可选的组件分别是interceptor和一个selector,interceptor是拦截器(如下图所示)


    Interceptor用于Source的一组拦截器,按照预设的顺序必要地方对events进行过滤和自定义的 处理逻辑实现
    在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的。也即在日志进入到 source之前,对日志进行一些包装、清新过滤等等动作

    这个拦截器是在什么位置呢?就是说这个interceptor是在你的数据源和你的source之间的一个环节,那这个环节相当于就是可以对你的数据源提前会做一个过滤,然后这个selector是有点类似于路由选择,就是消息已经在这了,我开始对这个消息进行一个存储,你这个消息不是要存到channel上吗?如果你后面如果有多个channel的话,那这个消息我应该是存到哪个channel上,或者是所有的channel都应该存储这样的消息,这个时候就要配置一个selector,所以selector这块就是像我们最一开始说的复制与复用(如下图所示)

    我们继续看拦截器,拦截器主要是对这个event进行一个过滤或者是自定义一些处理逻辑的实现,它主要是在你这个日志与source之间的,然后对这个日志进行一个拦截,就相当于提前制定哪些日志可以往后传,哪些数据可以直接被丢掉,然后它除了拦截之后它还可以对你的日志数据重新做一些个包装,那主要的提供的一些拦截器就有这么几个

    Timestamp Interceptor:在event的header中添加一个key叫:timestamp,value为当前的时间戳
    Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip
    Static Interceptor:可以在event的header中添加自定义的key和value
    Regex Filtering Interceptor:通过正则来清洗或包含匹配的events
    Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

    这些拦截器可以直接互相组合,就是不仅仅通过一个拦截,可以通过多个进行一个拦截器的拼装,通过这个chain的方式进行一个组合起来,然后对于组合之后的话,你可以对它进行一个前后的一个顺序依次的处理。
    然后我们再看一下这个selector,这个selector这块也是容易理解的,刚才我们说过这个selector它有两个事情,一个是复制和复用对吧?那这复制就是分别对外两个配置,一个配置就是这个Replicating,还有一个复用Multiplexing,复制刚刚讲过了,就是一个消息能够被复制多份,复用就是一个消息可以选择性的去选择(如下图)

    channel selectors 有两种类型:

    Replicating Channel Selector (default):将source过来的events发往所有channel
    Multiplexing Channel Selector:而Multiplexing 可以选择该发往哪些channel

    这个source前面有两个不同类型的消息,那这个一个类型的消息你可以选择后面的一个channel,如果只选择某一个channel去做传递消息的话,你可以选择复用的方式,如果这一个消息可以被复制多份,就像一个广播的形式发送消息的话,广播是什么意思呢?广播就是一个消息,被复制出多份然后下游每一个节点都同时接收同样的消息是不是?,那这样的情况就可以用复制的形式去用。
    问题:Multiplexing 需要判断header里指定key的值来决定分发到某个具体的channel,如果demo和demo2同时运行 在同一个服务器上,如果在不同的服务器上运行,我们可以在 source1上加上一个 host 拦截器,这样可以通过header 中的host来判断event该分发给哪个channel,而这里是在同一个服务器上,由host是区分不出来日志的来源的,我们必 须想办法在header中添加一个key来区分日志的来源 – 通过设置上游不同的Source就可以解决
    然后接下来就看一下从整体的角度来看可靠性,那为什么说这个flume它的可靠性还是比较OK的呢?那从这么几点来看

    Flume保证单次跳转可靠性的方式:传送完成后,该事件才会从通道中移除
    Flume使用事务性的方法来保证事件交互的可靠性。
    整个处理过程中,如果因为网络中断或者其他原因,在某一步被迫结束了,这个数据会在下一次重新传输。
    Flume可靠性还体现在数据可暂存上面,当目标不可访问后,数据会暂存在Channel中,等目标可访问之后,再 进行传输
    Source和Sink封装在一个事务的存储和检索中,即事件的放置或者提供由一个事务通过通道来分别提供。这保证 了事件集在流中可靠地进行端到端的传递。

    Sink开启事务 Sink从Channel中获取数据 Sink把数据传给另一个Flume Agent的Source中 Source开启事务 Source把数据传给Channel Source关闭事务 Sink关闭事务

    首先就是说它有一个事务性,这个事务性什么意思呢?就是我们刚才已经提过了,它主要是和channel类型有关,刚刚我们说了channel类型有两个一个是file一个是memory对吧?为了保证消息不丢失,为了可靠就是你可以选择file对不对?通过file channel的方式去传输,然后另外一个就是说当我这个消息在传输的过程中,当传输到了下一个节点上,那如果要是说接收的这个节点出现了一些异常,比如说一些网络异常,那由此就会就可以导致你的数据就需要重发的,然后另外在同一个节点内,如果是source写入的数据,把这个数据已经写入到这个channel里面去,那这个数据在,比如说它写这个数据它也是成批成批写的,那同时在这批之类,它整体的数据出现了一些异常的话,那这个时候就所有的数据一旦有一个数据出现了异常,那同一批的其他数据都不会写入到channel里面去,那已经接收到的部分这批数据就直接被抛弃,然后这个时候靠上一个节点重新再发送一些数据,重新再补充一些数据进来,那这里面就涉及到了事务性,那flume是使用了一个事务性的方式来保证了传输event或者传输整个事件在整个过程中的可靠性,就是说在你sink必须在你的event传入到channel之后或者是已经这个event传输到下一个channel里面或者是你这个消息已经到了外部的数据目的地之后,就相当于你这个数据已经可以认为是被下游已经完整的接收到了,就是你的数据已经在下游非常可靠的落地了,这个时候你的event才能从你的channel中进行移除掉的,所以这样的机制才能保证你的event无论是在一个agent里面还是多个agent里面之间的流转都是可以保证可靠的,并且由于这样的一个事务保证,你整个event就可以被成功的存入起来。
    然后这是一个整个消息在传输过程中比如说一个端到端传递的一个步骤,就从你的前面的sink怎么把你的数据传输到下游的另一个agent里面去这么一个过程。
    好了这个flume大概我们之前也说了,可以支持一个很强的扩展性是不是?(如下图所示)

    你可以把它想象成一个乐高玩具一样,然后进行一个有效的拼装,比如说这是一个其中一个组件,然后把这个组件进行一个前后的关联,你可以把这个组件之间并行多套,或者你可以把这个组件进行上下游进行一个关联,就是上游的sink要对接到下游的source上面,也可以多个sink同时消息汇入到下游的同一个source里面去,并且你可以做更复杂的一些搭建,这些都可以通过简单的配置就可以去完成。
    复杂流动的目的就是说这根据你的业务场景的一个复杂程度了,那你每一套agent可能下游就是面对着业务处理的流程是不一样,这个是完全是可以根据你的进行一些选择,比如如下图所示

    每一个webserver上都部署一个agent对不对?那你这有三个webserver就对着三个agent对不对?那你在这个实际的架构过程中你的日志服务器是有很多节点的,那你不可能每一个节点,如果是没有这个Consolldation的话,你每一个节点都去往这个HDFS直接去写的话,这个就不太合适,因为你这个日志服务器它仅仅是用来做日志收集做的,而你把它权限在放开到再去写HDFS,它相当于是那个角色定位有些混乱,另外一个就是说你这个数据源并发同时去写的话对HDFS操作也不是一个很好的设计,所以最好把这里面数据都汇总到同一台或者是数目比较少的那几种,只要是你那个压力能扛得住的一些Consolldation进行一个集中处理,然后Consolldation再去对接到后台的一些存储服务,这样会前面和后面部分相当于它们面对的角色是不一样的。
    还有一个就比较典型了(如下图所示)

    这个sink和这个source的一个关联。然后我再看一下这个(如下图所示)

    这个上图其实有点像我们之前讲的路由选择,你看这个左边的source可能通过不同的方式去对接的,然后把这个不同的消息通过一个channel然后通过多个sink,每一个sink都对街到后面不同的应该是Consolldation,然后每一个Consolldation进行各自的一个日志收集,那你这里面就跟你的业务相关了,可能是你这个Consolldation之间收集的数据是一致的也有可能是不一致的对不对?,然后后面对着不同的sink,这不同的sink就往后端存储的时候你可以写到HDFS上也可以写到hbase上,因为你这个数据是可以,比如说这两个channel就对接着不同的存储是不是?那你上面的这个channel是往HDFS上去写,下面这个channel是往本地文件去写,但是你可以再搞一个channel然后往Hbase上去写都可以很随意。

    搭建http://note.youdao.com/noteshare?id=7d903eb22b05f0b4943389bfc5c6d51f&sub=1993705EC6A1439DB03B29D68D278BFC
    2 留言 2019-02-10 12:33:52 奖励16点积分
  • 大数据Storm平台11、Storm

    前文链接:https://write-bug.com/article/2229.html
    Storm在前文中我们说过了离线数据挖掘的MR和Spark-core,而Storm是通过流式处理实时数据的计算框架。
    MR中

    处理海量量数据,吞吐能力强
    一次性处理整个数据集
    大批量输入大批量输出
    中间磁盘落地dump
    任务执行完结束

    Storm中

    实时分析数据(实时报表动态、流量波动、反馈系统)
    时效性(毫秒级)
    增量式处理
    全程在内存种流转
    任务无结束


    在MR中,我们输入数据都是一次性把文件直接塞给处理程序,之后MR会自动分片数据分为多少个任务去处理,而在Storm中,实行增量式处理,就是来一条数据处理一条数据(类似生产线源源不断产生和输入),可以做做公司的实时报表、监控流量、迭代数据,用输出数据来指导输入数据、去做反馈系统。

    在提到Storm时,有个流式处理的概念,而实际上刚才说过就像工厂罐头传送带一样处理并输出,在我们学MR时,有个本地调试命令:
    cat input|python map.py|sort|python reduce.py >> output
    就像一个管道似的被接连处理。
    在分布式流式计算这样的大环境中,Storm只是其中一种实现方案,后面会有Spark-Streaming(分钟级别)等。而分布式需要解决什么?流量控制、容灾冗余、路径选择、拓展。
    Storm特点

    无持久化层—>速度快,中间无dump,整个过程在内存中进行。
    保证消息得到处理—>可靠性
    本地模式—>模拟集群功能
    支持多种编程语言—>thrift RPC协议实现
    0.8版本之前ZeroMQ做底层消息队列—>高效
    原语spout/bolt—>类似map/reduce

    Storm基本概念

    框代表同一角色,一个圈是一个spout task/bolt task,四个圈是四个并发度。

    Stream:以tuple给基本单位形成的有向无界数据流
    Tuple:最基本的数据单元,包裹内部数据类型——对应HDFS的block
    Topology:由spout和bolt组成的网络拓扑图,类似MR中的Job,但不会结束,除非主动Kill

    任务提交:Storm jar code.jar MyTopology(类) arg1(参数) arg2storm jar 负责来连接Nimbus(相当于MR的Job Tracker)并上传jar包stream-Grouping等方式连接:Shuffle/Fields
    Spout:消息生产者(水龙头)

    可以对接很多类型的数据流收集消息处理的ack、fail,那成功和失败了后怎么处理还需要自己开发,所以如果消息失败,可重新emit一个tuple,后续处理可自行开发可指定emit多个Stream流:OutFieldsDeclarer.declareStream定义,SpoutOutputCollector指定(随机)nextTuple 开发时主要函数
    Bolt:消息处理逻辑

    过滤、访问外部服务(数据库等)、数据格式化、聚合、汇总。。。多bolt处理负责步骤execute 主方法

    在业务角度看:
    MR+Storm结合使用

    在某种场景下,如果算法足够简单或者MR和Storm的算法可以达到同样效果情况下,可以用Storm做整条数据处理,但是最好用MR去对以星期/月为基准做一个处理保障,毕竟MR更稳定并且有中途存储机制,而Storm一次处理的数据量又有限,如果算法比较复杂,就只能用Storm尽量拟合结果做输出,之后MR再以天为基准做处理并存储.
    如果Storm运行中突然挂掉,那处理方式肯定是报警人工重启了,但是在挂掉到重启的这段时间的数据肯定是丢了,如果不找回来会影响我后面计算的准确度,那我如何回溯这部分数据呢,一种是去上游(比如说是Kafka消息队列)回溯消息,另一种是借助存储机制(比如说Storm后面可以连接Hbase)可以通过Hbase的时间戳一判断就可以回溯到这部分数据.再不济情况下反正我Storm对结果的拟合度并不高,那只是今天有影响,第二天我就可以通过MR追回来数据的效果,根据业务来考虑.
    Stream-Grouping

    shuffle-grouping:随机分组,负载均衡
    fields-grouping:按制定的field(相当于key,可开发)分组
    all-grouping:广播分组
    Global-grouping:全局分组(类似合并)

    Storm应用场景
    常见模式:

    流式
    过滤:bolt逻辑组装(多到一)


    此图主要展示组装概念,当然也可以像下面的图一样组装


    Join(多到一)这里的join不像MR中的全局join一样保证数据,因为毕竟数据不完整所以业务用的很少
    持续计算——机器学习迭代用bolt计算的输出来指导spout的输入,迭代计算
    分布式RPC——独立服务比如在用户所请求的服务器上设置redis数据库,通过请求数据计算并输出到个性化数据存储到redis有延时性.


    Storm架构


    主:Nimbus:分配任务和资源调度如果挂掉:因为本身不存储并且在内存中流转,重启之后,像什么事情没有发生一样——无状态(快速失败fail-fast)
    从:Supervisor:监控工作接收任务快速失败fail-fast,监控自己的Worker工作
    Worker:工作进程,内存有多个executor
    executor线程池,里面维护很多task(轮转执行),默认每次只会执行一个task(可配置)
    Task:线程,Storm最细的粒度,本质是一个节点类的实例对象spout和bolt的线程都是task
    Zookeeper协调管理,所有状态数据在zookeeper上或本地磁盘上

    通过几句代码可以直接构建一幅拓扑图:
    //对象内部有默认配置,可修改 Config conf = new Config(); //设置Worker数量 conf.setNumWorkers(2); // 设置Executor数量 topolopyBuilder.setSpout("BlueSpout", new BlueSpout(), 2); topolopyBuilder.setBolt("GreenBolt", new GreenBolt(), 2) .setNumTasks(4) // 设置Task数量 .shuffleGrouping("BlueSpout"); topolopyBuilder.setBolt("YellowBolt", new YellowBolt(), 6) .shuffleGrouping("GreenBolt");
    在一次任务中,可能一个executor有多个task,而且数目一旦生成除非改代码否则不能动态改变,那我们可以动态调配这次任务的并发度

    重新配置Topology “myTopology”使用5个Workers
    BlueSpout使用3个Executors
    Yellowbolt使用10个Executors

    ]# storm rebalance myTopology -n 5 -e BlueSpout=3 -e YellowBolt=10

    strom容错
    架构容错:

    Zookeeper

    存储Nimbus与Supervisor数据
    节点宕机

    Heartbeatworker汇报executor信息supervisor汇报自身信息通过zookeeperNimbus
    Nimbus/Supervisor宕机

    Worker继续工作Worker失败,任务失败
    Worker出错
    Supervisor重启worker

    数据容错:

    timeout

    防止集群阻塞
    ack机制

    ack本质是一个或多个task,特殊的task,非常轻量,工作:反馈信息和透传每个Topology都有一个Acker组件
    所有节点ack成功,任务成功

    实现:tupleid和ack的tupleid作异或=0表示成功




    一个tuple没有ack

    处理的task挂掉了,超时,重新处理
    Ack挂掉了

    一致性hash 全挂了,超时,重新处理



    Spout挂掉了
    重新处理

    Bolt

    Anchoring 产生新tuple

    将tuple作为一个锚点添加到原tuple上
    Multi-anchoring

    如果tuple有两个原tuple,则为每个tuple添加一个锚点
    Ack

    通知ack task,该tuple已被当前bolt成功消费
    Fail

    通知ack task,该tuple已被消费失败

    Storm开发
    Spout
    public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); }Bolt
    public static class ExclamationBolt implements IRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }本地模式与conf常用配置
    本地模式运行ExclamationTopology的代码: Config conf = new Config(); conf.setDebug(true); //conf.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology());• Config.TOPOLOGY_WORKERS• Config.TOPOLOGY_ACKERS• Config.TOPOLOGY_MAX_SPOUT_PENDING• Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
    实践:

    Storm-word count
    Flume+kafka+Storm+Hbase+Hive —— 数据通路
    实时推荐:Storm+中文分词
    2 留言 2019-03-07 15:15:34 奖励16点积分
  • 大数据Hadoop平台2-3、MapReduce

    前文链接:https://write-bug.com/article/2049.html
    由于前面的文章再进行深入文字过多会影响阅读体验,所以决定再抽出一篇文章:
    这节主要有这么几个重点:

    MR基础深入
    Hadoop streaming开发,支持大多数语言开发MR
    Hadoop2.6.5搭建
    实践

    MR基础深入
    还是用这个图来讲解:

    map和reduce算子都是采用多进程的并发方式,便于每个任务占用独享空间,方便对每一个任务进行资源调度与调配,但是进程比线程会消耗更多的启动时间,直接制约着任务延迟,失效性不高,只能做离线高吞吐任务,而后面的spark-core反而是采用了多线程模型,所以速度更快,但是吞吐量大时稳定性不是太高
    HDFS:文件要存储在HDFS中,每个文件切分 成多个一定大小(默认64M)的Block (默认3个备份)存储在多个节点 (DataNode)上,文本格式:text file明文格式,但是浪费空间,SquenceFile格式默认<k,v>对存储,支持压缩等等格式
    InputFormat接口: MR框架基础类之一

    数据分割(Data Splits)防止句子被切散,当有一条记录横跨两个block时,在分割过程中会被分割到前面的block中去,所以就会出现split和block不完全相等大小的情况
    记录读取器(Record Reader)以标准输入每读取一条记录,就调用一次map函数,这条记录作为map的参数,读取下一条记录直到split尾部

    map:逻辑处理,标准输入,标准输出
    shuffle:Partion(对keyhash), Sort(对key sort), Spill(溢写), Meger, Combiner(map端的局部reduce), Copy, Memery, Disk……
    partitioner:由{key,vlaue}—>{partition号,key,value}partition号:key ->hash 取模 ->返回m
    Mermory Buffer: shuffle过程都是在这里完成的(比如partition号已经确定,对key排序,合并combiner等等),上节已经说过默认100M,溢写阈值0.8:80M,清空内存,dump到本地

    有时逻辑处理combiner不适用:比如求中值
    reduce : 逻辑处理,标准输入,标准输出,同样也有着内存缓冲区:spil,sort等功能,前面map端的partition和combiner直接影响到这里获取的输入源不会出现key重复的情况,所以合并reduce输出文件就是最终结果了。
    slot概念:map和reduce有着不同的slot槽位—-cpu核数-1—-一种容器资源

    mapred.tasktracker.map.tasks.maximum(默认2) mapreduce.tasktracker.tasks.reduce.maximum(默认2)mapred.local.dir(map中间结果存放路径)dfs.data.dir(hdfs目录存放路径)

    在这里我们列举一下其中的细节知识点:

    JobTracker只做管理和通知,数据只在map和reduce之间流动,准确的说,只会在TaskTracker之间流动。
    排序是框架内置的,默认就有。分组不是减少网络开销,分组不是合并,只是把相同的key的value放到一起,并不会减少数据.分组是给了同一个map中相同key的value见面的机会.作用是为了在reduce中进行处理.map函数仅能处理一行,两行中出现的这个单词是无法在一个map中处理的.map不能处理位于多行中的相同的单词.分组是为了两行中的相同的key的value合并到一起。
    在自定义MyMapper类内部定义HashMap处理的是一个block,在map方法内部定义处理的是一行。
    在hadoop全局中不会有线程问题,因为hadoop起的是进程,不会有并发问题存在.这也就造成了MR的速度问题。
    map和reduce不是1对1的,通常map数量远远超过reduce,reduce常常是每个节点上一个。
    map个数为split的份数。
    Reduce个数等于输出个数
    dfs.block.size决定block大小,默认64M—-hdfs-site.xml
    在memory排序:速排—-80%水位线:先排序再溢写
    把每一个block变成一个RecordReader
    分而治之思想:分:map(split)—-一对一的关系 分发:patition—-相同的key统一到一个reduce上 合:reduce
    压缩文件不可切分,非压缩文件和sequence文件可以切分
    增加task的数量,一方面增加了系统的开销,另一方面增加了负载平衡和减小了任务失败的代价;
    map task的数量即mapred.map.tasks的参数值,用户不能直接设置这个参数。Input Split的大小,决定了一个Job拥有多少个map。默认input split的大小是64M(与dfs.block.size的默认值相同)。然而,如果输入的数据量巨大,那么默认的64M的block会有几万甚至几十万的Map Task,集群的网络传输会很大,最严重的是给Job Tracker的调度、队列、内存都会带来很大压力。
    mapred.min.split.size这个配置项决定了每个 Input Split的最小值,用户可以修改这个参数,从而改变map task的数量。
    一个恰当的map并行度是大约每个节点10-100个map,且最好每个map的执行时间至少一分钟。
    reduce task的数量由mapred.reduce.tasks这个参数设定,默认值是1。
    合适的reduce task数量是0.95或者0.75*( nodes mapred.tasktracker.reduce.tasks.maximum), 其中,mapred.tasktracker.tasks.reduce.maximum的数量一般设置为各节点cpu core数量,即能同时计算的slot数量。对于0.95,当map结束时,所有的reduce能够立即启动;对于1.75,较快的节点结束第一轮reduce后,可以开始第二轮的reduce任务,从而提高负载均衡。

    Hadoop streaming
    MapReduce和HDFS采用Java实现,默认提供Java编程接口

    Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中 使用 Streaming方便已有程序向Hadoop平台移植,可移植性好
    原理:在map和reduce外层套了一层标准输入标准输出,内层语言就没有了强依赖,所以在开发过程中,只需要按照一定格式标准输入读取数据,标准输出写数据即可
    容易单机调试:cat input |mapper|sort|python reducer > output
    局限:默认只能处理文本数据,如果处理二进制文件,需要先进行编码转化文本。效率没有Java开发效率高(两次拷贝和解析分割)
    脚本参数:

    input: 指定作业的输入文件的HDFS路径,支持使用*通配 符,支持指定多个文件或目录,可多次使用output: 指定作业的输出文件的HDFS路径,路径必须不存在, 并且具备执行作业用户有创建该目录的权限,只能 使用一次mapper: 用户自己写的mapper程序 “python 、bash等”reduer: 用户自己写的reduce程序 —不是必须的(比如简单过滤)file:本地文件(提交命令的机器)分发到各个节点,适合小文件cachefile:hdfs压缩文件分发到各个节点archivefile:hdfs压缩目录分发到各个节点(自动解压)

    这里有一些jobconf的基本配置:(高版本:-D)

    mapred.map.tasks:map task数目:split、压缩文件、block个数
    mapred.reduce.tasks:控制reduce task数目:个数适当设置,过多输出大量小文件,过少出错还要从新map成本过高
    stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
    num.key.fields.for.partition指定对key分出来的前几部分做partition而不是整个 key
    mapred.job.name:作业名
    mapred.job.priority:作业优先级
    mapred.job.map.capacity:最多同时运行map任务数
    mapred.job.reduce.capacity:最多同时运行reduce任务数
    mapred.task.timeout:任务没有响应(输入输出)的最大时间
    mapred.compress.map.output:map的输出是否压缩
    mapred.map.output.compression.codec:map的输出压缩方式
    mapred.output.compress:reduce的输出是否压缩
    mapred.output.compression.codec:reduce的输出压缩方式
    stream.map.output.field.separator:map输出分隔符
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \

    利用该配置可以完成二次排序
    -jobconf org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \利用该配置可以完成key排序
    -jobconf stream.num.map.output.key.fields=1 \设置map分隔符的位置,该位置前的为key,之后的为value
    -jobconf mapred.text.key.partitioner.options="-k1,1" \选择哪一部分做partition,根据几到几分区
    -jobconf mapred.text.key.comparator.options="-k1,1n" \设置key中需要比较的字段或字节范围
    Hadoop2.6.5搭建环境:centos7
    http://note.youdao.com/noteshare?id=830761ebbc56f38c926030ca882b4313&sub=482AF51AF94B4E60BA1DE02B7C850B0F这个搭建还是用的HDFS1.0的容灾机制,2.0还可以使用,但在工作中是完全使用2.0机制的,搭建流程请参考(体验):参考文章:https://www.cnblogs.com/selinux/p/4155814.html【192.168.87.150】master1:NN ZKFC RM【192.168.87.151】master2:NN ZKFC RM【192.168.87.155】slave1:DN NM ZK JN【192.168.87.156】slave2:DN NM ZK JN【192.168.87.157】slave3:DN NM【192.168.87.158】slave4:DN NM ZK JN
    实践代码流程
    wordcount
    分发,分发方式:

    -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \-cacheFile "hdfs://master:9000/white_list#ABC" \-file ./white_listcacheArchiveFile ——map.py

    os.path.isdir()用于判断对象是否为一个目录
    os.listdir()用于返回一个由文件名和目录名组成的列表,需要注意的是它接收的参数需要是一个绝对的路径
    os.path.isfile()用于判断对象是否为一个文件
    压缩文件传过来,传在read_local_file_func函数中,再通过get_cacheFile_handlers函数中的isdir判断是否目录,再通过listdir遍历目录返回文件名和目录名,再通过get_file_handler函数把文件以只读进来返回文件(文件的指针将会放在文件的开头)。追加f_handlers_list数组中,返回数组给read_local_file_func函数作strip,把白名单中的单词分到set中返回set,之后再判断word是否在set中然后输出。

    red.py

    map输出文件读进来分别付给key和value值,利用一个当前单词和计数池数组mege数组中的单词和数量,之后清空,循环,输出。
    全排序
    第一个:排序

    a.txt b.txt读进来,做split后赋给key,val由于mr是按字典排序,所以需要加成相同字段数red中还原字段输出配置中:强制用一个reduce来处理
    第二个:排序之后再按前50 后50 分成两个桶

    map中模拟一个partition号idx加在前面标识,red中还原数据输出配置中:两个reduce处理,一个partition,从第二项后面分割数据,第三项为val
    第三个:全部利用配置来排序

    map读进来分割后red输出配置中:一共四个字段,前三个为key,后一个为value,根据二三做partition,三个reduce处理

    表单joinAB 表
    A.left join(B)
    A.key <— B(拼)

    Step1: map_a给a文件强制中间加flag 1
    Step2: map_b给b文件强制中间加flag 2
    Step3

    map_join把两个文件以w(打开一个文件只用于写入。如果该文件已存在则将其覆盖。如果该文件不存在,创建新文件。)方式打开strip分别赋值给key1,key2输出red_join标准输入流进来split中 赋值给key,flag,val.判断flag 设立中间val值,输出key,v1,v2,清空中间值

    鲁棒:虽然可以设置一个flag1,隔一个flag2,但是根据业务不同:

    key出现多次flag全是1—-没有join上,但是把key保留住,A中key字段在B中没有,所以需要表示null等flag全是2
    按字典排序的
    Linux:head -20 ip.lib.txt |column -t 格式化挖掘数据,与算法结合折半查找:二分法——排好序的列表
    mid=low+high /2定位
    截断一半high=mid-1
    函数load_ip_lib——func只读加载文件,split,判断是否5个字段,分别赋值,追加给list数组返回,mapper把返回数组传进来,同样对cookie文件split,判断是否2个字段,分别赋值,再把IP字段和list数组传入get_addr函数二分法判断IP字段,并且根据ip段判断province,返回province,mapper输出cookie,IP,address
    -reducer "cat" \-jobconf "mapred.reduce.tasks=2" \-jobconf "mapreduce.reduce.memory.mb=5000" \输出数据压缩怎么控制让mr任务支持压缩功能,通过压缩形式控制后续mr的并发个数
    -jobconf "mapred.reduce.tasks=10"-jobconf "mapred.compress.map.output=true" \#中间结果map的输出开关-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \#Gzip格式-jobconf "mapred.output.compress=true" \#最后压缩-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \再把输出数据当作输入看mapcat(标准输入输出)是不是10个map个数
    总结分类

    数据统计 hive(mr)—-word count
    数据过滤—-白名单
    同类汇聚—-join
    全局排序—-单/多reduce
    容错框架(报警邮件)

    Linux:cat ip.lib.txt |awk '{print $1,$NF}'| head 只保留第一个字段和最后一个字段思考:我们如何把这些散装的实践和思想进行拼装应用在业务上呢?
    2 留言 2019-01-23 13:49:16 奖励23点积分
  • 大数据Storm平台8、Zookeeper

    前文链接:https://write-bug.com/article/2119.html
    前面很多文章已经多多少少涉及到了zookeeper,后面的文章也会更加依赖zookeeper,所以这里正式介绍一下:
    Zookeeper原理前文说过,其角色是生态中的一个权威角色,贯穿与整个生态,那么它为什么可以有效的协调不同机器之间的工作呢?
    zookeeper是一个分布式锁服务、名字服务器、分布式同步、组服务,基于Paxos协议在集群内访问任何一台机器得到result都是一样的。(Google内部实现叫Chubby)
    我们在单机开发的时候涉及到锁的是多线程开发:内存锁,互斥锁,读写锁等等,在一个进程内部对公共资源进行竞争,而一个进程空间的内存地址是一致的,所以导致不同的线程对同一块进程空间都是有操作权限的,代码与代码段有公共区域读写的时候会造成混乱,需要有一个锁来控制顺序关系。

    在分布式系统中,就不能通过几行代码去写一个锁解决机器与机器之间的协调和大量数据了,机器之间是独立隔离的,为了防止分布式系统中多个进程之间互相干扰需要有个进程进行调度,这个技术核心就类似于分布式锁服务。
    核心问题:没有一个全局的主控,协调和控制中心
    我们需要一个松散耦合(对硬件依赖不强)的分布式系统中粗粒度锁(粗到节点顺序工作就可以)以及可靠性存储(低容量存储数据)的系统去解决这个问题。
    数据模型:与标准文件系统很相似,但不能像Linux一样 : cd ..
    只能通过一个绝对路径去访问某一个节点,这有点像HDFS操作命令的路径,并且在zookeeper这么一个节点里面,它可以存储一些数据,并且他自然而然就给你配一些属性,这个属性就关于你数据的长度,创建时间,修改时间等等,然后你这个节点具有文件属性又有路径的一个特点,即它可以存数据,又可以通过一个绝对路径能够访问到指定的节点。
    在Zookeeper里面没有文件和路径的说法的,其实每个文件和目录都是一个节点
    那说到节点的话,这节点就有一定的属性了,节点属性有四个:

    Persistent Nodes:永久有效的节点,只要不手动删除(Client显式的删除),系统不崩溃文件永久存在
    Ephemeral Nodes:临时节点,仅在创建该节点client保持连接期间有效(心跳机制),一点连接丢失,zookeeper会自动删除该节点,(机器挂掉的时候所连接创建的该目录被删除,即使机器恢复也只能再次创建才可以)
    Sequence Nodes:(名字服务器(名字不重复))顺序节点,client申请创建该节点时,zookeeper会自动在节点路径末尾添加递增序列号,这种类型是实现分布式锁,分布式queue等特殊功能的关键;不允许单独存在,需要和前面两种任何一种同时存在

    监控机制

    getChildren():zookeeper是有监控功能的,可以监控某台机器是否生效,比如可以应用在HDFS2.0的主备切换机制里,如果主挂了,就可以监控并启动备份节点了,这个2.0中介绍的很详细了,那么如何监控到主挂了呢,就是通过临时节点监控这个机器节点的,如果这个机器一旦出现异常,临时节点就消失了,那么感知到节点消失这个事件发生的就是消失节点的父节点,再由父节点主动去上报备用节点或者不同场景的上层服务器节点(比如流量分发器), 比如网民访问新浪首页,流量分发器同时分发50%流量给两台服务器,但是server1挂了,创建的节点消失了,那流量分发器如何检测到server1挂了呢?父节点会主动上报(数据里面可以写是连接的哪个连接IP地址,其实流量分发器可以把父节点下的所有节点遍历一遍,就可以知道映射和连接的那个节点IP就知道哪个服务器挂了,不给分发流量就行了),以上是一个getChildren()的监控
    getData():节点数据发生变化的监控
    exists():节点是否存在

    三个关键点:

    一次性监控,被触发后,需要重新设置(只上报一次,上报一次后节点挂了或者重新恢复节点时候,父节点已经不会通知了,每次需要触发的时候就要重新设置)
    保证先收到事件,再收到数据修改的信息
    传递性,如create或者delete会触发节点监控点,同时也会触发父节点的监控点

    风险:

    客户端看不到所有数据变化,比如说网络原因(比如由于网络IO接收不到监控变化)
    多个事件的监控,有可能只会触发一次。一个客户端设置了关于某个数据点exists和get Data的监控,则当该数据被删除的时候只会触发被删除的通知
    客户端网络中断的过程无法收到监控的窗口时间,要由模块进行容错设计

    数据访问权限
    zookeeper本身提供了ACL机制,表示为 scheme: id: permissions,第一个字段表示采用哪一种机制,第二个id表示用户,permissions表示相关权限(如只读,读写,管理等),每个节点上的“访问控制连”(ACL,Access Control List)保存了各客户端对于该节点的访问权限
    例:
    IP:19.22.0.0/16,READ 表示IP地址以19.22开头的主机有该数据节点的读权限
    权限:

    CREATE 有创建子节点的权限
    READ 有读取节点数据和子节点列表的权限
    WRITE 有修改节点数据的权限,无创建和删除子节点的权限
    DELETE 有删除子节点的权限
    ADMIN 有设置节点权限的权限

    模式机制:

    World 它下面只有一个id, 叫anyone, world: anyone代表任何人,zookeeper 中对所有人有权限的结点就是属于world: anyone的
    Auth 已经被认证的用户(可以用过用户名:密码的方式,kerberos)
    Digest 通过username:password字符串的MD5编码认证用户
    Host 匹配主机名后缀,如,host: corp.com匹配host: host1.corp.com, host: host2.corp.com,但不能匹配host: host1.store.com
    IP 通过IP识别用户,表达式格式为 addr/bits

    public class NewDigest {public static void main(String[] args) throws Exception {List<ACL> acls = new ArrayList<ACL>(); //添加第一个id,采用用户名密码形式 Id id1 = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin"));ACL acl1 = new ACL(ZooDefs.Perms.ALL, id1);acls.add(acl1); //添加第二个id,所有用户可读权限Id id2 = new Id("world", "anyone");ACL acl2 = new ACL(ZooDefs.Perms.READ, id2);acls.add(acl2);// zk用admin认证,创建/test ZNode。 ZooKeeper zk = new ZooKeeper("host1:2181,host2:2181,host3:2181", 2000, null); zk.addAuthInfo("digest", "admin:admin".getBytes());zk.create("/test", "data".getBytes(), acls, CreateMode.PERSISTENT); }}
    API参考:http://zookeeper.apache.org/doc/r3.3.3/api/org/apache/zookeeper/ZooKeeper.html应用

    配置管理

    更新配置文件(过滤,地域等等策略),人工操作单机机器多时过麻烦,脚本配置机器压力过大,而在zookeeper集群中只需要sever都访问这个节点更新配置就好了,虽然是个一个节点,但背后是个集群,压力会被集群后的机器自然的分散开。

    集群管理

    监控机器状态-》比如临时节点getChildren()遍历选主-》临时节点+顺序节点:选择当前是最小编号的 Server 为 Master ,主节点挂掉时,临时节点消失,选新主节点自动加1,成为新主节点,当前的节点列表中又出现一个最小编号的节点,原主节点复活后想成为主节点,需要创建新节点尾号再加1成为一个普通节点。
    共享锁服务


    控制不同节点顺序(粗粒度)协同,与选主类似,先处理最小编号节点进程任务

    队列管理
    同步队列 • 所有成员都聚齐才可使用—getChildren方式父节点通知FIFO队列 • 生产消费者—最小编号先处理

    zookeeper安装
    参考:http://note.youdao.com/noteshare?id=168883c03a9eb0d7b2e0c5a0a0216e1a&sub=963C8CD231AD4BF5ACE97C2FCAD96431
    实践
    基本操作命令
    执行客户端 zkCli.sh

    ls / 查看当前目录
    create /text “test” 创建节点
    create -e /text “test” 创建临时节点
    reate -s /text “test” 创建序列节点
    get /test 查看节点

    Java代码操作
    1 留言 2019-02-09 14:24:57 奖励15点积分
显示 0 到 25 ,共 25 条

热门回复

eject